This file is a merged representation of a subset of the codebase, containing files not matching ignore patterns, combined into a single document by Repomix. The content has been processed where empty lines have been removed.

================================================================
File Summary
================================================================

Purpose:
--------
This file contains a packed representation of the entire repository's contents.
It is designed to be easily consumable by AI systems for analysis, code review,
or other automated processes.

File Format:
------------
The content is organized as follows:
1. This summary section
2. Repository information
3. Directory structure
4. Multiple file entries, each consisting of:
  a. A separator line (================)
  b. The file path (File: path/to/file)
  c. Another separator line
  d. The full contents of the file
  e. A blank line

Usage Guidelines:
-----------------
- This file should be treated as read-only. Any changes should be made to the
  original repository files, not this packed version.
- When processing this file, use the file path to distinguish
  between different files in the repository.
- Be aware that this file may contain sensitive information. Handle it with
  the same level of security as you would the original repository.

Notes:
------
- Some files may have been excluded based on .gitignore rules and Repomix's configuration
- Binary files are not included in this packed representation. Please refer to the Repository Structure section for a complete list of file paths, including binary files
- Files matching these patterns are excluded: .specstory/**/*.md, .venv/**, _private/**, CLEANUP.txt, **/*.json, *.lock
- Files matching patterns in .gitignore are excluded
- Files matching default ignore patterns are excluded
- Empty lines have been removed from all files

Additional Info:
----------------

================================================================
Directory Structure
================================================================
.cursor/
  rules/
    0project.mdc
    cleanup.mdc
    filetree.mdc
    quality.mdc
.github/
  workflows/
    push.yml
    release.yml
examples/
  upload_example.py
src/
  twat_fs/
    upload_providers/
      __init__.py
      bashupload.py
      catbox.py
      core.py
      dropbox.py
      fal.py
      filebin.py
      litterbox.py
      pixeldrain.py
      protocols.py
      s3.py
      simple.py
      types.py
      uguu.py
      utils.py
      www0x0.py
    __init__.py
    __main__.py
    cli.py
    py.typed
    upload.py
tests/
  data/
    test.txt
  __init__.py
  test_filebin_pixeldrain.py
  test_integration.py
  test_s3_advanced.py
  test_twat_fs.py
  test_upload.py
.gitignore
.pre-commit-config.yaml
cleanup.py
LICENSE
LOG.md
MANIFEST.in
mypy.ini
pyproject.toml
README.md
TODO.md
update_providers.py
VERSION.txt

================================================================
Files
================================================================

================
File: .cursor/rules/0project.mdc
================
---
description: About this project
globs:
---
# About this project

`twat-fs` is a file system utility library focused on robust and extensible file upload capabilities with multiple provider support. It provides:

- Multi-provider upload system with smart fallback (catbox.moe default, plus Dropbox, S3, etc.)
- Automatic retry for temporary failures, fallback for permanent ones
- URL validation and clean developer experience with type hints
- Simple CLI: `python -m twat_fs upload_file path/to/file.txt`
- Easy installation: `uv pip install twat-fs` (basic) or `uv pip install 'twat-fs[all,dev]'` (all features)

## Development Notes
- Uses `uv` for Python package management
- Quality tools: ruff, mypy, pytest
- Clear provider protocol for adding new storage backends
- Strong typing and runtime checks throughout

================
File: .cursor/rules/cleanup.mdc
================
---
description: Run `cleanup.py` script before and after changes
globs: 
---
Before you do any changes or if I say "cleanup", run the `cleanup.py update` script in the main folder. Analyze the results, describe recent changes in @LOG.md and edit @TODO.md to update priorities and plan next changes. PERFORM THE CHANGES, then run the `cleanup.py status` script and react to the results.

When you edit @TODO.md, lead in lines with empty GFM checkboxes if things aren't done (`- [ ] `) vs. filled (`- [x] `) if done.

================
File: .cursor/rules/filetree.mdc
================
---
description: File tree of the project
globs: 
---
[ 992]  .
├── [  64]  .benchmarks
├── [  96]  .cursor
│   └── [ 224]  rules
│       ├── [ 821]  0project.mdc
│       ├── [ 516]  cleanup.mdc
│       ├── [2.5K]  filetree.mdc
│       └── [2.0K]  quality.mdc
├── [  96]  .github
│   └── [ 128]  workflows
│       ├── [2.7K]  push.yml
│       └── [1.4K]  release.yml
├── [3.5K]  .gitignore
├── [ 532]  .pre-commit-config.yaml
├── [1.0K]  LICENSE
├── [2.9K]  LOG.md
├── [ 153]  MANIFEST.in
├── [ 12K]  README.md
├── [5.2K]  TODO.md
├── [   7]  VERSION.txt
├── [ 11K]  cleanup.py
├── [  96]  dist
├── [  96]  examples
│   └── [ 948]  upload_example.py
├── [ 306]  filetree.sh
├── [ 439]  mypy.ini
├── [7.0K]  pyproject.toml
├── [ 128]  src
│   └── [ 384]  twat_fs
│       ├── [ 447]  __init__.py
│       ├── [ 733]  __main__.py
│       ├── [8.9K]  cli.py
│       ├── [ 128]  data
│       │   ├── [1.5K]  _test.jpg
│       │   └── [383K]  test.jpg
│       ├── [   1]  py.typed
│       ├── [ 24K]  upload.py
│       └── [ 640]  upload_providers
│           ├── [3.5K]  __init__.py
│           ├── [6.1K]  bashupload.py
│           ├── [5.3K]  catbox.py
│           ├── [ 12K]  core.py
│           ├── [ 20K]  dropbox.py
│           ├── [8.5K]  fal.py
│           ├── [5.0K]  filebin.py
│           ├── [ 10K]  litterbox.py
│           ├── [6.1K]  pixeldrain.py
│           ├── [4.0K]  protocols.py
│           ├── [5.1K]  s3.py
│           ├── [6.0K]  simple.py
│           ├── [ 728]  types.py
│           ├── [4.3K]  uguu.py
│           ├── [6.8K]  utils.py
│           └── [4.1K]  www0x0.py
├── [ 352]  tests
│   ├── [  63]  __init__.py
│   ├── [  96]  data
│   │   └── [ 100]  test.txt
│   ├── [3.1K]  test_filebin_pixeldrain.py
│   ├── [9.0K]  test_integration.py
│   ├── [8.7K]  test_s3_advanced.py
│   ├── [ 180]  test_twat_fs.py
│   └── [ 29K]  test_upload.py
├── [2.8K]  update_providers.py
└── [382K]  uv.lock

14 directories, 51 files

================
File: .cursor/rules/quality.mdc
================
---
description: Quality
globs: 
---
- **Verify Information**: Always verify information before presenting it. Do not make assumptions or speculate without clear evidence.
- **No Apologies**: Never use apologies.
- **No Whitespace Suggestions**: Don't suggest whitespace changes.
- **No Inventions**: Don't invent major changes other than what's explicitly requested.
- **No Unnecessary Confirmations**: Don't ask for confirmation of information already provided in the context.
- **Preserve Existing Code**: Don't remove unrelated code or functionalities. Pay attention to preserving existing structures.
- **No Implementation Checks**: Don't ask the user to verify implementations that are visible in the provided context.
- **No Unnecessary Updates**: Don't suggest updates or changes to files when there are no actual modifications needed.
- **No Current Implementation**: Don't show or discuss the current implementation unless specifically requested.
- **Use Explicit Variable Names**: Prefer descriptive, explicit variable names over short, ambiguous ones to enhance code readability.
- **Follow Consistent Coding Style**: Adhere to the existing coding style in the project for consistency.
- **Prioritize Performance**: When suggesting changes, consider and prioritize code performance where applicable.
- **Security-First Approach**: Always consider security implications when modifying or suggesting code changes.
- **Test Coverage**: Suggest or include appropriate unit tests for new or modified code.
- **Error Handling**: Implement robust error handling and logging where necessary.
- **Modular Design**: Encourage modular design principles to improve code maintainability and reusability.
- **Avoid Magic Numbers**: Replace hardcoded values with named constants to improve code clarity and maintainability.
- **Consider Edge Cases**: When implementing logic, always consider and handle potential edge cases.
- **Use Assertions**: Include assertions wherever possible to validate assumptions and catch potential errors early.

================
File: .github/workflows/push.yml
================
name: Build & Test
on:
  push:
    branches: [main]
    tags-ignore: ["v*"]
  pull_request:
    branches: [main]
  workflow_dispatch:
permissions:
  contents: write
  id-token: write
concurrency:
  group: ${{ github.workflow }}-${{ github.ref }}
  cancel-in-progress: true
jobs:
  quality:
    name: Code Quality
    runs-on: ubuntu-latest
    steps:
      - name: Checkout code
        uses: actions/checkout@v4
        with:
          fetch-depth: 0
      - name: Run Ruff lint
        uses: astral-sh/ruff-action@v3
        with:
          version: "latest"
          args: "check --output-format=github"
      - name: Run Ruff Format
        uses: astral-sh/ruff-action@v3
        with:
          version: "latest"
          args: "format --check --respect-gitignore"
  test:
    name: Run Tests
    needs: quality
    strategy:
      matrix:
        python-version: ["3.10", "3.11", "3.12"]
        os: [ubuntu-latest]
      fail-fast: true
    runs-on: ${{ matrix.os }}
    steps:
      - name: Checkout code
        uses: actions/checkout@v4
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: ${{ matrix.python-version }}
      - name: Install UV
        uses: astral-sh/setup-uv@v5
        with:
          version: "latest"
          python-version: ${{ matrix.python-version }}
          enable-cache: true
          cache-suffix: ${{ matrix.os }}-${{ matrix.python-version }}
      - name: Install test dependencies
        run: |
          uv pip install --system --upgrade pip
          uv pip install --system ".[test]"
      - name: Run tests with Pytest
        run: uv run pytest -n auto --maxfail=1 --disable-warnings --cov-report=xml --cov-config=pyproject.toml --cov=src/twat_fs --cov=tests tests/
      - name: Upload coverage report
        uses: actions/upload-artifact@v4
        with:
          name: coverage-${{ matrix.python-version }}-${{ matrix.os }}
          path: coverage.xml
  build:
    name: Build Distribution
    needs: test
    runs-on: ubuntu-latest
    steps:
      - name: Checkout code
        uses: actions/checkout@v4
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.12"
      - name: Install UV
        uses: astral-sh/setup-uv@v5
        with:
          version: "latest"
          python-version: "3.12"
          enable-cache: true
      - name: Install build tools
        run: uv pip install build hatchling hatch-vcs
      - name: Build distributions
        run: uv run python -m build --outdir dist
      - name: Upload distribution artifacts
        uses: actions/upload-artifact@v4
        with:
          name: dist-files
          path: dist/
          retention-days: 5

================
File: .github/workflows/release.yml
================
name: Release
on:
  push:
    tags: ["v*"]
permissions:
  contents: write
  id-token: write
jobs:
  release:
    name: Release to PyPI
    runs-on: ubuntu-latest
    environment:
      name: pypi
      url: https://pypi.org/p/twat-fs
    steps:
      - name: Checkout code
        uses: actions/checkout@v4
        with:
          fetch-depth: 0
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.12"
      - name: Install UV
        uses: astral-sh/setup-uv@v5
        with:
          version: "latest"
          python-version: "3.12"
          enable-cache: true
      - name: Install build tools
        run: uv pip install build hatchling hatch-vcs
      - name: Build distributions
        run: uv run python -m build --outdir dist
      - name: Verify distribution files
        run: |
          ls -la dist/
          test -n "$(find dist -name '*.whl')" || (echo "Wheel file missing" && exit 1)
          test -n "$(find dist -name '*.tar.gz')" || (echo "Source distribution missing" && exit 1)
      - name: Publish to PyPI
        uses: pypa/gh-action-pypi-publish@release/v1
        with:
          password: ${{ secrets.PYPI_TOKEN }}
      - name: Create GitHub Release
        uses: softprops/action-gh-release@v1
        with:
          files: dist/*
          generate_release_notes: true
        env:
          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

================
File: examples/upload_example.py
================
async def upload_file(file_path: str, service: str = "bash") -> str:
    uploader = get_uploader(service)
    result = await uploader.upload_file(Path(file_path))
        raise Exception(msg)
def main(file_path: str, service: str = "bash"):
    return asyncio.run(upload_file(file_path, service))
    fire.Fire(main)

================
File: src/twat_fs/upload_providers/__init__.py
================
def get_provider_module(provider: str) -> Provider | None:
        if provider.lower() == "simple":
            module = importlib.import_module(f".{provider}", __package__)
            if "No module named" in str(e):
                logger.debug(f"Provider module {provider} not found")
                logger.warning(f"Error importing provider {provider}: {e}")
        missing_attrs = [attr for attr in required_attrs if not hasattr(module, attr)]
            logger.warning(
                f"Provider {provider} is missing required attributes: {', '.join(missing_attrs)}"
        if not hasattr(module, "PROVIDER_HELP"):
            logger.warning(f"Provider {provider} is missing help information")
        return cast(Provider, module)
        logger.error(f"Unexpected error loading provider {provider}: {e}")
def get_provider_help(provider: str) -> ProviderHelp | None:
        module = get_provider_module(provider)
                return getattr(module, "PROVIDER_HELP", None)
        logger.error(f"Error getting help for provider {provider}: {e}")

================
File: src/twat_fs/upload_providers/bashupload.py
================
class BashUploadProvider(Provider, ProviderClient):
    def get_credentials(cls) -> None:
    def get_provider(cls) -> ProviderClient | None:
        return cls()
    async def _do_upload(self, file_path: Path) -> UploadResult:
        validate_file(file_path)
        data = aiohttp.FormData()
        with open(file_path, "rb") as f:
            data.add_field("file", f, filename=file_path.name)
            async with aiohttp.ClientSession() as session:
                async with session.post(
                    handle_http_response(response, self.provider_name)
                    response_text = await response.text()
                    for line in response_text.splitlines():
                        if line.startswith("wget "):
                            url = line.split(" ")[1].strip()
                            log_upload_attempt(self.provider_name, file_path, True)
                            return UploadResult(
                    raise NonRetryableError(msg, self.provider_name)
    def upload_file(
        return asyncio.run(self._do_upload(Path(str(local_path))))
    async def async_upload_file(
            result = await self._do_upload(Path(str(file_path)))
            return cast(Awaitable[UploadResult], result)
            log_upload_attempt(self.provider_name, file_path, False, e)
def get_provider() -> BashUploadProvider:
    return BashUploadProvider()
    provider = get_provider()
    return provider.upload_file(

================
File: src/twat_fs/upload_providers/catbox.py
================
class CatboxProvider(Provider, ProviderClient):
    def get_credentials(cls) -> None:
    def get_provider(cls) -> ProviderClient | None:
        return cls()
    async def _do_upload(self, file_path: Path) -> UploadResult:
        validate_file(file_path)
        data = aiohttp.FormData()
        data.add_field("reqtype", "fileupload")
        with open(file_path, "rb") as f:
            data.add_field("fileToUpload", f, filename=file_path.name)
            async with aiohttp.ClientSession() as session:
                async with session.post(self.upload_url, data=data) as response:
                    handle_http_response(response, self.provider_name)
                    url = await response.text()
                    log_upload_attempt(self.provider_name, file_path, True)
                    return UploadResult(url=url)
    def upload_file(
        return asyncio.run(self._do_upload(Path(str(local_path))))
    async def async_upload_file(
            result = await self._do_upload(Path(str(file_path)))
            return cast(Awaitable[UploadResult], result)
            log_upload_attempt(self.provider_name, file_path, False, e)
def get_provider() -> CatboxProvider:
    return CatboxProvider()
    provider = get_provider()
    return provider.upload_file(

================
File: src/twat_fs/upload_providers/core.py
================
T = TypeVar("T", covariant=True)
R = TypeVar("R", str, UploadResult, covariant=True)
P = ParamSpec("P")
URL_CHECK_TIMEOUT = aiohttp.ClientTimeout(total=30.0)  # seconds
class UploadCallable(Protocol[P, T]):
    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: ...
class AsyncUploadCallable(Protocol[P, T]):
    async def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: ...
def convert_to_upload_result(result: str) -> UploadResult: ...
def convert_to_upload_result(
def convert_to_upload_result(result: UploadResult) -> UploadResult: ...
def convert_to_upload_result(result: dict[str, Any]) -> UploadResult: ...
    if isinstance(result, UploadResult):
            result.metadata.update(metadata)
    if isinstance(result, str):
        return UploadResult(url=result, metadata=meta)
    if isinstance(result, dict):
        return UploadResult(**result)
    msg = f"Cannot convert {type(result)} to UploadResult"
    raise TypeError(msg)
class TimingMetrics(NamedTuple):
    def as_dict(self) -> dict[str, float | str]:
class RetryStrategy(str, Enum):
def with_retry(
    def decorator(func: UploadCallable[P, T]) -> UploadCallable[P, T]:
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
            for attempt in range(max_attempts):
                    return func(*args, **kwargs)
                        delay = min(initial_delay * (2**attempt), max_delay)
                        delay = min(initial_delay * (attempt + 1), max_delay)
                    logger.warning(
                    time.sleep(delay)
def with_async_retry(
    def decorator(func: AsyncUploadCallable[P, T]) -> AsyncUploadCallable[P, T]:
        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
                    return await func(*args, **kwargs)
                    await asyncio.sleep(delay)
def validate_file(func: UploadCallable[P, T]) -> UploadCallable[P, T]:
        file_path = next(
            (arg for arg in args if isinstance(arg, str | Path)),
            kwargs.get("local_path") or kwargs.get("file_path"),
            raise ValueError(msg)
        path = Path(str(file_path))  # Explicitly convert to string
        if not path.exists():
            raise FileNotFoundError(msg)
        if not path.is_file():
        if not path.stat().st_size:
def sync_to_async(func: UploadCallable[P, T]) -> AsyncUploadCallable[P, T]:
        return await asyncio.to_thread(func, *args, **kwargs)
def async_to_sync(func: AsyncUploadCallable[P, T]) -> UploadCallable[P, T]:
        return asyncio.run(func(*args, **kwargs))
class UploadError(Exception):
    def __init__(self, message: str, provider: str | None = None) -> None:
        super().__init__(message)
class RetryableError(UploadError):
class NonRetryableError(UploadError):
async def validate_url(
            aiohttp.ClientSession(
            session.head(
                raise NonRetryableError(msg)
            raise RetryableError(msg)
        raise RetryableError(msg) from e
@with_async_retry(
async def ensure_url_accessible(url: str) -> UploadResult:
    if await validate_url(url):
        return convert_to_upload_result(url)
def with_url_validation(
    async def wrapper(*args: P.args, **kwargs: P.kwargs) -> UploadResult:
        result = await func(*args, **kwargs)
            return await ensure_url_accessible(result)
def with_timing(func: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]:
        start_time = time.time()
        end_time = time.time()

================
File: src/twat_fs/upload_providers/dropbox.py
================
load_dotenv()
class DropboxCredentials(TypedDict):
class DropboxClient:
    def __init__(self, credentials: DropboxCredentials) -> None:
        self.dbx = self._create_client()
    def _create_client(self) -> dropbox.Dropbox:
        return dropbox.Dropbox(
    def _refresh_token_if_needed(self) -> None:
            self.dbx.users_get_current_account()
            if "expired_access_token" in str(e):
                    logger.debug(
                logger.debug("Access token expired, attempting refresh")
                    self.dbx.refresh_access_token()
                    logger.debug(f"Unable to refresh access token: {refresh_err}")
                logger.debug(f"Authentication error: {e}")
    def upload_file(
        logger.debug(f"Starting upload process for {file_path}")
        path = Path(file_path)
        if not path.exists():
            raise FileNotFoundError(msg)
        if not os.access(path, os.R_OK):
            raise PermissionError(msg)
            self._refresh_token_if_needed()
            upload_path = _normalize_path(upload_path)
            remote_path = path.name if remote_path is None else str(remote_path)
                timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
                name, ext = os.path.splitext(remote_path)
            db_path = _normalize_path(os.path.join(upload_path, remote_path))
            logger.debug(f"Target Dropbox path: {db_path}")
            _ensure_upload_directory(self.dbx, upload_path)
            exists, remote_metadata = _check_file_exists(self.dbx, db_path)
                raise DropboxFileExistsError(msg)
            file_size = os.path.getsize(path)
                _upload_small_file(self.dbx, path, db_path)
                _upload_large_file(self.dbx, path, db_path, SMALL_FILE_THRESHOLD)
            url = _get_share_url(self.dbx, db_path)
                raise DropboxUploadError(msg)
            logger.debug(f"Successfully uploaded to Dropbox: {url}")
            return convert_to_upload_result(url)
            logger.error(f"Failed to upload to Dropbox: {e}")
            raise DropboxUploadError(msg) from e
    def get_account_info(self) -> None:
            logger.error(f"Failed to get account info: {e}")
def get_credentials() -> DropboxCredentials | None:
    access_token = os.getenv("DROPBOX_ACCESS_TOKEN")
        logger.debug("DROPBOX_ACCESS_TOKEN environment variable not set")
        "refresh_token": os.getenv("DROPBOX_REFRESH_TOKEN"),
        "app_key": os.getenv("DROPBOX_APP_KEY"),
        "app_secret": os.getenv("DROPBOX_APP_SECRET"),
def get_provider() -> DropboxClient | None:
        credentials = get_credentials()
        client = DropboxClient(credentials)
            client.get_account_info()
                    logger.warning(
                    logger.error(
                logger.error(f"Dropbox authentication failed: {e}")
            logger.error(f"Failed to initialize Dropbox client: {e}")
        logger.error(f"Error initializing Dropbox client: {e}")
    client = get_provider()
        raise ValueError(msg)
        remote_path = Path(file_path).name
        return client.upload_file(
        raise ValueError(msg) from e
class DropboxUploadError(Exception):
class PathConflictError(DropboxUploadError):
class DropboxFileExistsError(Exception):
    def __init__(self, message: str, url: str | None = None):
        super().__init__(message)
class FolderExistsError(PathConflictError):
def _validate_file(local_path: Path) -> None:
    if not get_provider():
    if not local_path.exists():
    if not os.access(local_path, os.R_OK):
def _get_download_url(url: str) -> str | None:
        parsed = parse.urlparse(url)
        query = dict(parse.parse_qsl(parsed.query))
        return parsed._replace(
            netloc="dl.dropboxusercontent.com", query=parse.urlencode(query)
        ).geturl()
        logger.error(f"Failed to generate download URL: {e}")
def _get_share_url(dbx: dropbox.Dropbox, db_path: str) -> str:
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10),
    def get_url() -> str:
            shared_link = dbx.sharing_create_shared_link_with_settings(db_path)
            url = str(shared_link.url).replace("?dl=0", "?dl=1")
            logger.debug(f"Created share URL: {url}")
            logger.error(f"Failed to create share URL: {e}")
        url = get_url()
        if not isinstance(url, str):
            msg = f"Expected string URL but got {type(url)}"
def _ensure_upload_directory(dbx: Any, upload_path: str) -> None:
    logger.debug(f"Ensuring upload directory exists: {upload_path}")
            logger.debug(f"Attempting to create directory: {upload_path}")
            dbx.files_create_folder_v2(upload_path)
            logger.debug(f"Successfully created directory: {upload_path}")
                isinstance(e.error, dropbox.files.CreateFolderError)
                and e.error.get_path().is_conflict()
                logger.debug(f"Directory already exists: {upload_path}")
            logger.error(f"Failed to create directory: {e}")
def _get_file_metadata(dbx: Any, db_path: str) -> dict | None:
        metadata = dbx.files_get_metadata(db_path)
        if e.error.is_path() and e.error.get_path().is_not_found():
def _check_file_exists(dbx: Any, db_path: str) -> tuple[bool, dict | None]:
        if metadata := _get_file_metadata(dbx, db_path):
        logger.warning(f"Error checking file existence: {e}")
def _upload_small_file(dbx: dropbox.Dropbox, file_path: Path, db_path: str) -> None:
    logger.debug(f"Uploading small file: {file_path} -> {db_path}")
        with open(file_path, "rb") as f:
            dbx.files_upload(f.read(), db_path, mode=dropbox.files.WriteMode.overwrite)
        logger.debug(f"Successfully uploaded small file: {db_path}")
        logger.error(f"Failed to upload small file: {e}")
def _upload_large_file(
    logger.debug(f"Starting chunked upload: {file_path} -> {db_path}")
    file_size = os.path.getsize(file_path)
            upload_session_start_result = dbx.files_upload_session_start(
                f.read(chunk_size)
            logger.debug("Upload session started")
            cursor = dropbox.files.UploadSessionCursor(
                offset=f.tell(),
            commit = dropbox.files.CommitInfo(
            while f.tell() < file_size:
                if (file_size - f.tell()) <= chunk_size:
                    logger.debug("Uploading final chunk")
                    dbx.files_upload_session_finish(f.read(chunk_size), cursor, commit)
                    logger.debug(f"Uploading chunk at offset {cursor.offset}")
                    dbx.files_upload_session_append_v2(f.read(chunk_size), cursor)
                    cursor.offset = f.tell()
        logger.debug(f"Successfully uploaded large file: {db_path}")
        logger.error(f"Failed to upload large file: {e}")
def _normalize_path(path: str) -> str:
    normalized = path.replace("\\", "/")
    normalized = normalized.lstrip("/")
def _handle_api_error(e: Any, operation: str) -> None:
    if isinstance(e, AuthError):
        logger.error(f"Authentication error during {operation}: {e}")
    elif isinstance(e, dropbox.exceptions.ApiError):
        if e.error.is_path():
            path_error = e.error.get_path()
            if path_error.is_not_found():
                logger.error(f"Path not found during {operation}: {e}")
            elif path_error.is_not_file():
                logger.error(f"Not a file error during {operation}: {e}")
            elif path_error.is_conflict():
                logger.error(f"Path conflict during {operation}: {e}")
        logger.error(f"API error during {operation}: {e}")
        logger.error(f"Unexpected error during {operation}: {e}")
def _validate_credentials(credentials: DropboxCredentials) -> None:
def _get_client(credentials: DropboxCredentials) -> Any:
def _refresh_token(credentials: DropboxCredentials) -> DropboxCredentials | None:

================
File: src/twat_fs/upload_providers/fal.py
================
class FalProvider(ProviderClient, Provider):
    def __init__(self, key: str) -> None:
        self.client = fal_client.SyncClient(key=key)
    def get_credentials(cls) -> dict[str, str] | None:
        key = os.getenv("FAL_KEY")
    def get_provider(cls) -> ProviderClient | None:
        creds = cls.get_credentials()
            logger.debug("FAL_KEY not set in environment")
            return cls(key=str(creds["key"]).strip())
            logger.warning(f"Failed to initialize FAL provider: {err}")
    @with_async_retry(
    async def async_upload_file(
        if not hasattr(self.client, "upload_file"):
            raise NonRetryableError(msg, "fal")
            logger.debug("FAL: API credentials verified")
            if "401" in str(e) or "unauthorized" in str(e).lower():
            raise RetryableError(msg, "fal")
            file_path_str = str(file_path)
                result = self.client.upload_file(file_path_str)
                if "TypeError" in str(e) or "str" in str(e):
                    with open(file_path_str, "rb") as f:
                        result = self.client.upload_file(f)
            result_str = str(result).strip()
            return convert_to_upload_result(
    def upload_file(
        result: UploadResult = async_to_sync(self.async_upload_file)(
def get_credentials() -> dict[str, str] | None:
    return FalProvider.get_credentials()
def get_provider() -> ProviderClient | None:
    return FalProvider.get_provider()
    provider = get_provider()
    return provider.upload_file(

================
File: src/twat_fs/upload_providers/filebin.py
================
class FilebinProvider(BaseProvider):
    def __init__(self) -> None:
        super().__init__()
    def upload_file_impl(self, file: BinaryIO) -> UploadResult:
            filename = Path(file.name).name
            timestamp = int(time.time())
            suffix = "".join(
                secrets.choice(string.ascii_lowercase + string.digits) for _ in range(6)
            for attempt in range(max_retries):
                    response = requests.put(
                        logger.debug(
                        return UploadResult(
                            url=str(file_url),
                        time.sleep(retry_delay)
                        file.seek(0)  # Reset file pointer for retry
                        file.seek(0)
                raise ValueError(last_error)
            raise ValueError(msg)
            logger.error(f"Failed to upload to filebin.net: {e}")
                    "error": str(e),
    def get_credentials(cls) -> None:
    def get_provider(cls) -> ProviderClient | None:
        return cast(ProviderClient, cls())
def get_credentials() -> None:
def get_provider() -> ProviderClient | None:
    return FilebinProvider.get_provider()
def upload_file(
    provider = get_provider()
    return provider.upload_file(local_path, remote_path)

================
File: src/twat_fs/upload_providers/litterbox.py
================
T = TypeVar("T")
P = ParamSpec("P")
def _ensure_coroutine(
    @wraps(func)
    async def wrapper(*args: P.args, **kwargs: P.kwargs) -> UploadResult:
        result = await func(*args, **kwargs)
        if isinstance(result, str):
            return convert_to_upload_result(result)
        if not isinstance(result, UploadResult):
            msg = f"Expected UploadResult or str, got {type(result)}"
            raise RuntimeError(msg)
class LitterboxProvider(ProviderClient, Provider):
    def __init__(
        if not isinstance(default_expiration, ExpirationTime):
                default_expiration = ExpirationTime(default_expiration)
                raise ValueError(msg) from e
    def get_credentials(cls) -> dict[str, Any] | None:
    def get_provider(cls) -> ProviderClient | None:
        default_expiration = str(
            os.getenv("LITTERBOX_DEFAULT_EXPIRATION", "24h")
        ).strip()
            expiration = ExpirationTime(str(default_expiration))
            logger.warning(
        return cls(default_expiration=expiration)
    async def async_upload_file(
        file_path = Path(str(file_path))
        if not file_path.exists():
            raise FileNotFoundError(msg)
        expiration = kwargs.get("expiration") or self.default_expiration
        data = aiohttp.FormData()
        data.add_field("reqtype", "fileupload")
        data.add_field(
            str(
                if isinstance(expiration, ExpirationTime)
            with open(str(file_path), "rb") as f:
                file_content = f.read()
                filename=str(file_path.name),
            async with aiohttp.ClientSession() as session:
                    async with session.post(LITTERBOX_API_URL, data=data) as response:
                            raise RetryableError(msg, "litterbox")
                            error_text = await response.text()
                                raise NonRetryableError(msg, "litterbox")
                        url = await response.text()
                        if not url.startswith("http"):
                        return convert_to_upload_result(
                    raise RetryableError(msg, "litterbox") from e
    def upload_file(
        return cast(
            async_to_sync(self.async_upload_file)(
def get_credentials() -> dict[str, Any] | None:
    return LitterboxProvider.get_credentials()
def get_provider() -> ProviderClient | None:
    return LitterboxProvider.get_provider()
    provider = get_provider()
    return provider.upload_file(local_path, remote_path, expiration=expiration)

================
File: src/twat_fs/upload_providers/pixeldrain.py
================
class PixeldrainProvider(BaseProvider):
    def __init__(self) -> None:
        super().__init__()
    def upload_file_impl(self, file: BinaryIO) -> UploadResult:
            for attempt in range(max_retries):
                    response = requests.post(
                            time.sleep(retry_delay)
                            file.seek(0)
                        data = response.json()
                    logger.debug(f"Successfully uploaded to pixeldrain.com: {url}")
                    return UploadResult(
                raise ValueError(last_error)
            raise ValueError(msg)
            logger.error(f"Failed to upload to pixeldrain.com: {e}")
                    "error": str(e),
    def get_credentials(cls) -> None:
        return convert_to_upload_result(None)
    def get_provider(cls) -> ProviderClient | None:
        return cast(ProviderClient, cls())
    def _get_file_url(self, file_id: str | None) -> str | None:
def get_credentials() -> None:
    return PixeldrainProvider.get_credentials()
def get_provider() -> ProviderClient | None:
    return PixeldrainProvider.get_provider()
def upload_file(
    provider = get_provider()
    return provider.upload_file(local_path, remote_path)

================
File: src/twat_fs/upload_providers/protocols.py
================
T_co = TypeVar("T_co", covariant=True)
T_ret = TypeVar("T_ret", bound=UploadResult)
class ProviderHelp(TypedDict):
class ProviderClient(Protocol):
    def upload_file(
    async def async_upload_file(
class Provider(Protocol):
    def get_credentials(cls) -> Any | None:
    def get_provider(cls) -> ProviderClient | None:

================
File: src/twat_fs/upload_providers/s3.py
================
def get_credentials() -> dict[str, Any] | None:
    bucket = os.getenv("AWS_S3_BUCKET")
        logger.debug("Required AWS_S3_BUCKET environment variable not set")
    region = os.getenv("AWS_DEFAULT_REGION")
    path_style = os.getenv("AWS_S3_PATH_STYLE", "").lower() == "true"
    endpoint_url = os.getenv("AWS_ENDPOINT_URL")
    role_arn = os.getenv("AWS_ROLE_ARN")
    access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
    secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
    session_token = os.getenv("AWS_SESSION_TOKEN")
def get_provider(creds: dict[str, Any] | None = None):
        creds = get_credentials()
    if creds.get("region"):
    if creds.get("endpoint_url"):
    if creds.get("path_style"):
        client_kwargs["config"] = Config(s3={"addressing_style": "path"})
        client = boto3.client("s3", **client_kwargs)
            client.list_buckets()
            logger.warning(f"Failed to validate S3 client: {e}")
        logger.warning(f"Failed to create S3 client: {e}")
def upload_file(
    local_path = Path(local_path)
        raise ValueError(msg)
    client = get_provider(creds)
    key = str(remote_path or local_path.name)
        with open(local_path, "rb") as f:
            client.upload_fileobj(f, creds["bucket"], key)
            if creds.get("endpoint_url")
        return convert_to_upload_result(
        logger.warning(f"S3 upload failed: {e}")
        raise ValueError(msg) from e

================
File: src/twat_fs/upload_providers/simple.py
================
class SimpleProviderClient(Protocol):
    async def upload_file(self, file_path: Path) -> UploadResult:
T = TypeVar("T", bound="BaseProvider")
class BaseProvider(ABC, Provider):
    def get_credentials(cls) -> dict[str, Any] | None:
    def get_provider(cls) -> ProviderClient | None:
        return cls()
    def upload_file(
        path = Path(local_path)
        self._validate_file(path)
        with self._open_file(path) as file:
            result = self.upload_file_impl(file)
            if not result.metadata.get("success", True):
                msg = f"Upload failed: {result.metadata.get('error', 'Unknown error')}"
                raise RuntimeError(msg)
            return convert_to_upload_result(
    async def async_upload_file(
        path = Path(str(file_path))
        if not path.exists():
            raise FileNotFoundError(msg)
            f"file://{path.absolute()}",
                "local_path": str(path),
    def _open_file(self, file_path: Path) -> Generator[BinaryIO]:
            file = open(file_path, "rb")
                file.close()
    def _validate_file(self, file_path: Path) -> None:
        if not file_path.exists():
        if not file_path.is_file():
            raise ValueError(msg)
        if not os.access(file_path, os.R_OK):
            raise PermissionError(msg)
    def upload_file_impl(self, file: BinaryIO) -> UploadResult:

================
File: src/twat_fs/upload_providers/types.py
================
class ExpirationTime(str, Enum):
class UploadResult:
    def __init__(self, url: str, metadata: dict[str, Any] | None = None) -> None:

================
File: src/twat_fs/upload_providers/uguu.py
================
class UguuProvider(BaseProvider):
    def __init__(self) -> None:
        super().__init__()
    def upload_file_impl(self, file: BinaryIO) -> UploadResult:
            response = requests.post(self.url, files=files, timeout=30)
                raise RetryableError(msg, "uguu")
                raise NonRetryableError(msg, "uguu")
            result = response.json()
            logger.debug(f"Successfully uploaded to uguu.se: {url}")
            return UploadResult(
            raise RetryableError(msg, "uguu") from e
                    "error": str(e),
    def get_credentials(cls) -> dict[str, Any] | None:
    def get_provider(cls) -> ProviderClient | None:
        return cast(ProviderClient, cls())
def get_credentials() -> dict[str, Any] | None:
    return UguuProvider.get_credentials()
def get_provider() -> ProviderClient | None:
    return UguuProvider.get_provider()
def upload_file(
    provider = get_provider()
        raise ValueError(msg)
    return provider.upload_file(local_path, remote_path)

================
File: src/twat_fs/upload_providers/utils.py
================
T = TypeVar("T")
P = ParamSpec("P")
def create_provider_help(
def safe_file_handle(file_path: str | Path) -> Generator[BinaryIO]:
    path = Path(str(file_path))
    validate_file(path)
        file = open(path, "rb")
            file.close()
def validate_file(file_path: Path) -> None:
    if not file_path.exists():
        raise FileNotFoundError(msg)
    if not file_path.is_file():
        raise ValueError(msg)
    if not os.access(file_path, os.R_OK):
        raise PermissionError(msg)
def handle_http_response(
        if isinstance(response, aiohttp.ClientResponse)
        msg = f"Rate limited: {response.text if isinstance(response, requests.Response) else 'Too many requests'}"
        raise RetryableError(msg, provider_name)
        raise NonRetryableError(msg, provider_name)
def get_env_credentials(
    missing = [var for var in required_vars if not os.getenv(var)]
        logger.debug(f"Missing required environment variables: {', '.join(missing)}")
        if value := os.getenv(var):
def create_provider_instance(
            credentials = provider_class.get_credentials()
        if hasattr(provider_class, "get_provider"):
            return provider_class.get_provider()
        return cast(ProviderClient, provider_class())
        logger.warning(f"Failed to initialize provider {provider_class.__name__}: {e}")
def standard_upload_wrapper(
    return provider.upload_file(local_path, remote_path, **kwargs)
def log_upload_attempt(
        logger.debug(f"Successfully uploaded {file_path} using {provider_name}")
        logger.error(f"Failed to upload {file_path} using {provider_name}: {error}")

================
File: src/twat_fs/upload_providers/www0x0.py
================
class Www0x0Provider(BaseProvider):
    def __init__(self) -> None:
        super().__init__()
    def upload_file_impl(self, file: BinaryIO) -> UploadResult:
            response = requests.post(self.url, files=files, timeout=30)
                raise RetryableError(msg, "www0x0")
                raise NonRetryableError(msg, "www0x0")
            url = response.text.strip()
            if not url.startswith("http"):
            logger.debug(f"Successfully uploaded to 0x0.st: {url}")
            return UploadResult(
            raise RetryableError(msg, "www0x0") from e
                    "error": str(e),
    def get_credentials(cls) -> None:
    def get_provider(cls) -> ProviderClient | None:
        return cast(ProviderClient, cls())
def get_credentials() -> None:
def get_provider() -> ProviderClient | None:
    return Www0x0Provider.get_provider()
def upload_file(
    provider = get_provider()
        raise ValueError(msg)
    return provider.upload_file(local_path, remote_path)

================
File: src/twat_fs/__init__.py
================
__version__ = metadata.version(__name__)

================
File: src/twat_fs/__main__.py
================
    main()

================
File: src/twat_fs/cli.py
================
logger.remove()  # Remove default handler
log_level = os.getenv("LOGURU_LEVEL", "INFO").upper()
logger.add(
    filter=lambda record: record["level"].no < logger.level("WARNING").no,
def parse_provider_list(provider: str) -> list[str] | None:
    if provider.startswith("[") and provider.endswith("]"):
            return [p.strip() for p in provider[1:-1].split(",")]
class UploadProviderCommands:
    def status(self, provider_id: str | None = None, online: bool = False) -> None:
        console = Console(stderr=True)  # Use stderr for error messages
            with console.status("[cyan]Testing provider...[/cyan]"):
                result = _setup_provider(provider_id, verbose=True, online=online)
                    console.print(
                            f"\n[cyan]Online test time: {result.timing.get('total_duration', 0.0):.2f}s[/cyan]"
                    console.print(f"\n[red]Provider {provider_id} is not ready[/red]")
                    console.print(f"\n[yellow]Reason:[/yellow] {result.explanation}")
                    console.print("\n[yellow]Setup Instructions:[/yellow]")
                        result.help_info.get("setup", "No setup instructions available")
            table = Table(title="Provider Setup Status", show_lines=True)
            table.add_column("Provider", no_wrap=True)
            table.add_column("Status", no_wrap=True)
                table.add_column("Time (s)", justify="right", no_wrap=True)
            table.add_column("Details", width=50)
            with console.status("[cyan]Testing providers...[/cyan]") as status:
                results = _setup_providers(verbose=True, online=online)
                for provider, info in results.items():
                    if provider.lower() == "simple":
                            info.timing.get("total_duration", float("inf"))
                            else float("inf")
                        ready_providers.append((provider, info, time))
                        not_ready_providers.append((provider, info, time))
                ready_providers.sort(key=lambda x: x[2])
                not_ready_providers.sort(key=lambda x: x[0])
                    if info.help_info.get("setup"):
                            and info.timing.get("total_duration") is not None
                            time = f"{info.timing.get('total_duration', 0.0):.2f}"
                    table.add_row(*row)
            console.print("\n")  # Add some spacing
            console.print(table)
    def list(self, online: bool = False) -> None:
            logger.remove()
            logger.add(sys.stderr, level="INFO", format="{message}")
            result = _setup_provider(provider, verbose=False, online=online)
                active_providers.append(provider)
        sys.exit(0)
class TwatFS:
    def __init__(self) -> None:
        self.upload_provider = UploadProviderCommands()
    def upload(
            if isinstance(provider, str):
                providers = parse_provider_list(provider)
            if not Path(file_path).exists():
                logger.error(f"File not found: {file_path}")
                sys.exit(1)
            return _upload_file(
            logger.error(f"Upload failed: {e}")
def main() -> None:
        fire.Fire(TwatFS)
        fire.Fire(TwatFS())
upload_file = TwatFS().upload
setup_provider = TwatFS().upload_provider.status
def setup_providers():
    return TwatFS().upload_provider.status(None)
    main()

================
File: src/twat_fs/py.typed
================


================
File: src/twat_fs/upload.py
================
class ProviderStatus(Enum):
    READY = auto()
    NEEDS_CONFIG = auto()
    NOT_AVAILABLE = auto()
class ProviderInfo:
def _test_provider_online(
    test_file = Path(__file__).parent / "data" / "test.jpg"
    if not test_file.exists():
        with open(test_file, "rb") as f:
            original_hash = hashlib.sha256(f.read()).hexdigest()
        provider_module = get_provider_module(provider_name)
        client = provider_module.get_provider()
        start_time = time.time()
        read_start = time.time()
            _ = f.read()  # Read file to measure read time
        read_duration = time.time() - read_start
            upload_start = time.time()
            result = client.upload_file(test_file)
            upload_duration = time.time() - upload_start
                end_time = time.time()
                    "provider": float(hash(provider_name)),
            if isinstance(result, UploadResult):
                timing_metrics = result.metadata.get("timing", {})
            time.sleep(1.0)
            validation_start = time.time()
            response = requests.head(
            validation_duration = time.time() - validation_start
                "upload_duration": time.time() - upload_start,
            if "401" in str(e) or "authentication" in str(e).lower():
                "validation_duration": time.time() - validation_start,
        time.time()
        for attempt in range(max_retries):
                response = requests.get(url, timeout=30)
                    downloaded_hash = hashlib.sha256(response.content).hexdigest()
                    time.sleep(retry_delay)
def setup_provider(
        if provider.lower() == "simple":
            return ProviderInfo(
        provider_module = get_provider_module(provider)
            help_info = get_provider_help(provider)
            setup_info = help_info.get("setup", "")
            setup_info = help_info.get("setup", "").lower()
                    retention_note = setup_info[setup_info.find("note:") :].strip()
                setup_info = help_info.get("setup", "") if help_info else ""
            has_async = hasattr(client, "async_upload_file") and not getattr(
            has_sync = hasattr(client, "upload_file") and not getattr(
                provider_info = ProviderInfo(
                    f"{provider} ({type(client).__name__})"
                online_status, message, timing = _test_provider_online(provider)
                    logger.debug(
            logger.error(f"Error setting up provider {provider}: {e}")
        logger.error(f"Unexpected error setting up provider {provider}: {e}")
def setup_providers(
        result = setup_provider(provider, verbose=verbose, online=online)
def _get_provider_module(provider: str) -> Provider | None:
    return get_provider_module(provider)
def _try_upload_with_provider(
        raise ValueError(msg)
    path = Path(str(file_path))
    if path.exists():
        with open(path, "rb") as f:
    upload_result = client.upload_file(
        upload_result.url if isinstance(upload_result, UploadResult) else upload_result
            logger.warning(f"URL validation failed: status {response.status_code}")
        logger.warning(f"URL validation failed: {e}")
        UploadResult(url=upload_result, metadata={"provider": provider_name})
        if isinstance(upload_result, str)
    logger.info(
def _try_next_provider(
    tried = tried_providers or set()
        tried.add(provider)
            result = _try_upload_with_provider(
            logger.warning(f"Provider {provider} failed: {e}")
    msg = f"All providers failed: {', '.join(tried)}"
@with_retry(
def upload_file(
    path = Path(file_path)
    if not path.exists():
        raise FileNotFoundError(msg)
    if not path.is_file():
        providers = [provider] if isinstance(provider, str) else list(provider)
        return _try_upload_with_provider(
        if fragile or len(providers) == 1:
            raise NonRetryableError(msg, providers[0])
        logger.info(f"Provider {providers[0]} failed, trying alternatives")
        return _try_next_provider(
def _try_upload_with_fallback(
            raise RuntimeError(msg)
        return _try_upload_with_fallback(

================
File: tests/data/test.txt
================
This is a test file for upload testing.
It contains some sample content that will be used in tests.

================
File: tests/__init__.py
================


================
File: tests/test_filebin_pixeldrain.py
================
def test_file(tmp_path: Path) -> Path:
    file_path.write_text("test content")
def test_filebin_upload_success(test_file: Path) -> None:
    responses.add(
    provider = filebin.FilebinProvider()
    result = provider.upload_file(test_file)
def test_filebin_upload_failure(test_file: Path) -> None:
    with pytest.raises(RuntimeError, match="Upload failed"):
        provider.upload_file(test_file)
def test_pixeldrain_upload_success(test_file: Path) -> None:
    provider = pixeldrain.PixeldrainProvider()
def test_pixeldrain_upload_failure(test_file: Path) -> None:
def test_filebin_provider_initialization() -> None:
    provider = filebin.get_provider()
    assert isinstance(provider, filebin.FilebinProvider)
def test_pixeldrain_provider_initialization() -> None:
    provider = pixeldrain.get_provider()
    assert isinstance(provider, pixeldrain.PixeldrainProvider)

================
File: tests/test_integration.py
================
TEST_DIR = Path(__file__).parent / "data"
@pytest.fixture(scope="session")
def large_test_file():
    if not LARGE_FILE.exists():
        logger.info(
        LARGE_FILE.parent.mkdir(exist_ok=True)
        with LARGE_FILE.open("wb") as f:
            f.write(os.urandom(LARGE_FILE_SIZE))
@pytest.fixture(scope="session", autouse=True)
def cleanup_test_files():
    if LARGE_FILE.exists():
        LARGE_FILE.unlink()
class TestS3Integration:
    @pytest.fixture(autouse=True)
    def check_s3_credentials(self):
        if not s3.get_credentials():
            pytest.skip("S3 credentials not configured")
    def test_s3_setup(self):
        result = setup_provider("s3")
    def test_s3_upload_small_file(self):
        url = upload_file(SMALL_FILE, provider="s3")
        assert url.startswith("https://")
        assert url.endswith(SMALL_FILE.name)
    def test_s3_upload_large_file(self, large_test_file):
        start_time = time.time()
        url = upload_file(large_test_file, provider="s3")
        upload_time = time.time() - start_time
        assert url.endswith(large_test_file.name)
        logger.info(f"Large file upload took {upload_time:.1f} seconds")
    def test_s3_upload_with_custom_endpoint(self, monkeypatch):
        monkeypatch.setenv("AWS_ENDPOINT_URL", custom_endpoint)
class TestDropboxIntegration:
    def test_dropbox_setup(self) -> None:
        result = setup_provider("dropbox")
            and "setup" in result.explanation.lower()
    def test_dropbox_upload_small_file(self) -> None:
            url = upload_file(SMALL_FILE, provider="dropbox")
            assert "Failed to initialize Dropbox client" in str(e)
            assert "expired_access_token" in str(e) or "not configured" in str(e)
    def test_dropbox_upload_large_file(self, large_test_file: Path) -> None:
            url = upload_file(large_test_file, provider="dropbox")
class TestFalIntegration:
    def check_fal_credentials(self):
        if not fal.get_credentials():
            pytest.skip("FAL credentials not configured")
    def test_fal_setup(self):
        success, explanation = setup_provider("fal")
    def test_fal_upload_small_file(self):
        url = upload_file(SMALL_FILE, provider="fal")
    def test_fal_upload_large_file(self, large_test_file):
        url = upload_file(large_test_file, provider="fal")
class TestSetupIntegration:
    def test_setup_all_providers(self) -> None:
        setup_providers()
            if provider.lower() == "simple":
            result = setup_provider(provider)
                or "setup" in result.explanation.lower()
class TestCatboxIntegration:
    def test_catbox_setup(self):
        status, _ = setup_provider("catbox")
    def test_catbox_upload_small_file(self, small_file):
        url = upload_file(small_file, provider="catbox")
        assert url.startswith("https://files.catbox.moe/")
        assert len(url) > len("https://files.catbox.moe/")
    def test_catbox_upload_large_file(self, large_file):
        url = upload_file(large_file, provider="catbox")
    @pytest.mark.skipif(
        not os.getenv("CATBOX_USERHASH"),
    def test_catbox_authenticated_upload(self, small_file):
        result = upload_file(small_file, provider="catbox")
        assert result.url.startswith("https://files.catbox.moe/")
        filename = result.url.split("/")[-1]
        provider = catbox.get_provider()
        success = provider.delete_files([filename])
class TestLitterboxIntegration:
    def test_litterbox_setup(self):
        status, _ = setup_provider("litterbox")
    def test_litterbox_upload_small_file(self, small_file):
        provider = litterbox.LitterboxProvider(default_expiration=ExpirationTime.HOUR_1)
        url = provider.upload_file(small_file)
        assert url.startswith("https://litterbox.catbox.moe/")
        assert len(url) > len("https://litterbox.catbox.moe/")
    def test_litterbox_upload_large_file(self, large_file):
        provider = litterbox.LitterboxProvider(
        url = provider.upload_file(large_file)
    def test_litterbox_different_expirations(self, small_file):
        provider = litterbox.LitterboxProvider()
            url = provider.upload_file(small_file, expiration=expiration)

================
File: tests/test_s3_advanced.py
================
TEST_DIR = Path(__file__).parent / "data"
class TestAwsCredentialProviders:
    def test_environment_credentials(self, monkeypatch):
        monkeypatch.setenv("AWS_ACCESS_KEY_ID", TEST_ACCESS_KEY)
        monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", TEST_SECRET_KEY)
        monkeypatch.setenv("AWS_S3_BUCKET", TEST_BUCKET)
        creds = s3.get_credentials()
        with patch("boto3.client") as mock_client:
            provider = s3.get_provider(creds)
    def test_shared_credentials_file(self, tmp_path, monkeypatch):
        creds_file.parent.mkdir(exist_ok=True)
        creds_file.write_text(
        monkeypatch.setenv("AWS_SHARED_CREDENTIALS_FILE", str(creds_file))
        monkeypatch.setenv("AWS_DEFAULT_REGION", "us-east-1")
        monkeypatch.setenv("AWS_S3_BUCKET", "test-bucket")
    def test_assume_role(self, monkeypatch):
        monkeypatch.setenv("AWS_ROLE_ARN", "arn:aws:iam::123456789012:role/test-role")
            mock_sts = MagicMock()
                MagicMock(),  # Second call creates S3 client
class TestS3Configurations:
    def test_custom_endpoint(self, monkeypatch):
        monkeypatch.setenv("AWS_ENDPOINT_URL", "https://custom-s3.example.com")
            mock_client.assert_called_with(
    def test_path_style_endpoint(self, monkeypatch):
        monkeypatch.setenv("AWS_S3_PATH_STYLE", "true")
            assert hasattr(config, "s3")
    def test_custom_region_endpoint(self, monkeypatch):
        monkeypatch.setenv("AWS_DEFAULT_REGION", "eu-central-1")
            mock_client.assert_called_with("s3", region_name="eu-central-1")
class TestS3MultipartUploads:
    def large_file(self, tmp_path):
        with file_path.open("wb") as f:
            f.write(os.urandom(size))
    def test_multipart_upload(self, large_file, monkeypatch):
        monkeypatch.setenv("AWS_ACCESS_KEY_ID", "test_key")
        monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "test_secret")
        mock_s3 = MagicMock()
        with patch("twat_fs.upload_providers.s3.boto3.client", return_value=mock_s3):
            url = s3.upload_file(large_file)
            assert url.startswith("https://s3.amazonaws.com/test-bucket/")
            assert url.endswith(large_file.name)
            assert len(args) == 3  # Should have file object, bucket, and key
            assert hasattr(args[0], "read")  # Check if it's a file-like object
    def test_multipart_upload_failure(self, large_file, monkeypatch):
        mock_s3.upload_fileobj.side_effect = ClientError(
            with pytest.raises(ValueError, match="S3 upload failed"):
                s3.upload_file(large_file)

================
File: tests/test_twat_fs.py
================
def test_version():

================
File: tests/test_upload.py
================
class ProviderSetupResult(NamedTuple):
TEST_FILE = Path(__file__).parent / "data" / "test.txt"
def test_file(tmp_path: Path) -> Generator[Path, None, None]:
    file_path.write_text("test content")
    file_path.unlink()
def mock_fal_provider() -> Generator[MagicMock, None, None]:
    with patch("twat_fs.upload_providers.fal.FalProvider") as mock:
def mock_dropbox_provider() -> Generator[MagicMock, None, None]:
    with patch("twat_fs.upload_providers.dropbox.DropboxProvider") as mock:
def mock_s3_provider() -> Generator[MagicMock, None, None]:
    with patch("twat_fs.upload_providers.s3.S3Provider") as mock:
class TestProviderSetup:
    def test_setup_working_provider(self, mock_s3_provider: MagicMock) -> None:
        result = setup_provider("s3")
    def test_setup_missing_credentials(self) -> None:
        with patch("twat_fs.upload_providers.s3.get_credentials") as mock_creds:
    def test_setup_missing_dependencies(self) -> None:
            with patch("twat_fs.upload_providers.s3.get_provider") as mock_provider:
    def test_setup_invalid_provider(self) -> None:
        result = setup_provider("invalid")
    def test_setup_all_providers(
        assert all(
        results = setup_providers()
        assert len(results) == len(PROVIDERS_PREFERENCE)
    def test_setup_all_providers_with_failures(self) -> None:
            patch("twat_fs.upload_providers.s3.get_credentials") as mock_s3_creds,
            patch(
            patch("twat_fs.upload_providers.fal.get_provider") as mock_fal_provider,
            mock_fal_provider.return_value = MagicMock()
    def test_setup_provider_success(self) -> None:
        result = setup_provider("simple")
    def test_setup_provider_failure(self) -> None:
        assert "Provider not found" in result.explanation.lower()
        assert "setup" in result.explanation.lower()
    def test_setup_provider_dropbox(self) -> None:
        result = setup_provider("dropbox")
            and "setup" in result.explanation.lower()
    def test_setup_all_providers_check(self) -> None:
            if provider.lower() == "simple":
                or "setup" in result.explanation.lower()
class TestProviderAuth:
    def test_fal_auth_with_key(self, monkeypatch: pytest.MonkeyPatch) -> None:
        monkeypatch.setenv("FAL_KEY", "test_key")
        assert fal.get_credentials() is not None
        with patch("fal_client.status") as mock_status:
            assert fal.get_provider() is not None
    def test_fal_auth_without_key(self, monkeypatch: pytest.MonkeyPatch) -> None:
        monkeypatch.delenv("FAL_KEY", raising=False)
        assert fal.get_credentials() is None
        assert fal.get_provider() is None
    def test_dropbox_auth_with_token(self, monkeypatch: pytest.MonkeyPatch) -> None:
        monkeypatch.setenv("DROPBOX_ACCESS_TOKEN", "test_token")
        assert dropbox.get_credentials() is not None
        with patch("dropbox.Dropbox") as mock_dropbox:
            assert dropbox.get_provider() is not None
    def test_dropbox_auth_without_token(self, monkeypatch: pytest.MonkeyPatch) -> None:
        monkeypatch.delenv("DROPBOX_ACCESS_TOKEN", raising=False)
        assert dropbox.get_credentials() is None
        with pytest.raises(ValueError, match="Dropbox credentials not found"):
            dropbox.upload_file("test.txt")
    def test_s3_auth_with_credentials(self, monkeypatch: pytest.MonkeyPatch) -> None:
        monkeypatch.setenv("AWS_ACCESS_KEY_ID", "test_key")
        monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "test_secret")
        monkeypatch.setenv("AWS_DEFAULT_REGION", "us-east-1")
        monkeypatch.setenv("AWS_S3_BUCKET", "test-bucket")
        creds = s3.get_credentials()
        with patch("boto3.client") as mock_client:
            provider = s3.get_provider()
    def test_s3_auth_without_credentials(self, monkeypatch: pytest.MonkeyPatch) -> None:
        monkeypatch.delenv("AWS_ACCESS_KEY_ID", raising=False)
        monkeypatch.delenv("AWS_SECRET_ACCESS_KEY", raising=False)
        monkeypatch.delenv("AWS_DEFAULT_REGION", raising=False)
        monkeypatch.delenv("AWS_S3_BUCKET", raising=False)
        assert s3.get_credentials() is None
        assert s3.get_provider() is None
    def test_s3_auth_with_invalid_credentials(
        monkeypatch.setenv("AWS_ACCESS_KEY_ID", "invalid_key")
        monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "invalid_secret")
            mock_s3.head_bucket.side_effect = ClientError(
class TestUploadFile:
    def test_upload_with_default_provider(
        url = upload_file(test_file)
        mock_s3_provider.assert_called_once_with(
    def test_upload_with_specific_provider(
        url = upload_file(test_file, provider="s3")
    def test_upload_with_provider_list(
        url = upload_file(test_file, provider=["s3", "dropbox"])
    def test_upload_fallback_on_auth_failure(
            patch("twat_fs.upload_providers.s3.get_provider") as mock_s3_get_provider,
            mock_dropbox_client = MagicMock()
            mock_s3_provider.assert_not_called()
            mock_dropbox_client.upload_file.assert_called_once_with(
    def test_upload_fallback_on_upload_failure(
        mock_s3_provider.side_effect = Exception("Upload failed")
            mock_s3_client = MagicMock()
            mock_s3_client.upload_file.side_effect = Exception("Upload failed")
            mock_s3_client.upload_file.assert_called_once_with(
    def test_all_providers_fail(self, test_file: Path) -> None:
            patch("twat_fs.upload_providers.fal.get_provider") as mock_fal,
            patch("twat_fs.upload_providers.dropbox.get_provider") as mock_dropbox,
            patch("twat_fs.upload_providers.s3.get_provider") as mock_s3,
            patch("twat_fs.upload_providers.www0x0.get_provider") as mock_www0x0,
            patch("twat_fs.upload_providers.uguu.get_provider") as mock_uguu,
            with pytest.raises(
                upload_file(test_file)
    def test_invalid_provider(self, test_file: Path) -> None:
        with pytest.raises(ValueError, match="Invalid provider"):
            upload_file(test_file, provider="invalid")
    def test_upload_with_s3_provider(
    def test_s3_upload_failure(
        mock_s3_provider.side_effect = ClientError(
            upload_file(test_file, provider="s3")
class TestEdgeCases:
    def test_empty_file(self, tmp_path: Path) -> None:
        test_file.touch()
            client = MagicMock()
            client.upload_file.assert_called_once_with(
    def test_special_characters_in_filename(self, tmp_path: Path) -> None:
        test_file.write_text("test content")
    def test_unicode_filename(self, tmp_path: Path) -> None:
    def test_very_long_filename(self, tmp_path: Path) -> None:
    def test_nonexistent_file(self) -> None:
        with pytest.raises(FileNotFoundError):
            upload_file("nonexistent.txt")
    def test_directory_upload(self, tmp_path: Path) -> None:
        with pytest.raises(ValueError, match="is a directory"):
            upload_file(tmp_path)
    def test_no_read_permission(self, tmp_path: Path) -> None:
        test_file.chmod(0o000)  # Remove all permissions
        with pytest.raises(PermissionError):
    @pytest.mark.parametrize("size_mb", [1, 5, 10])
    def test_different_file_sizes(self, tmp_path: Path, size_mb: int) -> None:
        with test_file.open("wb") as f:
            f.write(b"0" * (size_mb * 1024 * 1024))
class TestCatboxProvider:
    def test_catbox_auth_with_userhash(self):
        provider = catbox.CatboxProvider({"userhash": "test_hash"})
    def test_catbox_auth_without_userhash(self):
        provider = catbox.CatboxProvider()
    async def test_catbox_upload_file(self, tmp_path):
        mock_response = AsyncMock()
        mock_response.text = AsyncMock(
        mock_session = AsyncMock()
        mock_session.post = AsyncMock()
        with patch("aiohttp.ClientSession", return_value=mock_session):
            result = await provider.async_upload_file(
            assert isinstance(result, UploadResult)
    async def test_catbox_upload_url(self):
            result = await provider.async_upload_url(
class TestLitterboxProvider:
    def test_litterbox_default_expiration(self):
        provider = litterbox.LitterboxProvider()
    def test_litterbox_custom_expiration(self):
        provider = litterbox.LitterboxProvider(
    def test_litterbox_invalid_expiration(self):
        with pytest.raises(ValueError):
            litterbox.LitterboxProvider(default_expiration="invalid")
    async def test_litterbox_upload_file(self, tmp_path):
def test_circular_fallback(
    mock_s3_provider.upload_file.side_effect = RetryableError("S3 failed", "s3")
    mock_dropbox_provider.upload_file.side_effect = RetryableError(
        RetryableError("Catbox failed first", "catbox"),  # First try fails
def test_fragile_mode(
    with pytest.raises(NonRetryableError) as exc_info:
        upload_file(test_file, provider="s3", fragile=True)
    assert "S3 failed" in str(exc_info.value)
def test_custom_provider_list_circular_fallback(
    mock_catbox_provider.upload_file.side_effect = RetryableError(
        RetryableError("Dropbox failed first", "dropbox"),  # First try fails
    url = upload_file(test_file, provider=["catbox", "s3", "dropbox"])

================
File: .gitignore
================
*_autogen/
.DS_Store
__version__.py
__pycache__/
_Chutzpah*
_deps
_NCrunch_*
_pkginfo.txt
_Pvt_Extensions
_ReSharper*/
_TeamCity*
_UpgradeReport_Files/
!?*.[Cc]ache/
!.axoCover/settings.json
!.vscode/extensions.json
!.vscode/launch.json
!.vscode/settings.json
!.vscode/tasks.json
!**/[Pp]ackages/build/
!Directory.Build.rsp
.*crunch*.local.xml
.axoCover/*
.builds
.cr/personal
.fake/
.history/
.ionide/
.localhistory/
.mfractor/
.ntvs_analysis.dat
.paket/paket.exe
.sass-cache/
.vs/
.vscode
.vscode/*
.vshistory/
[Aa][Rr][Mm]/
[Aa][Rr][Mm]64/
[Bb]in/
[Bb]uild[Ll]og.*
[Dd]ebug/
[Dd]ebugPS/
[Dd]ebugPublic/
[Ee]xpress/
[Ll]og/
[Ll]ogs/
[Oo]bj/
[Rr]elease/
[Rr]eleasePS/
[Rr]eleases/
[Tt]est[Rr]esult*/
[Ww][Ii][Nn]32/
*_h.h
*_i.c
*_p.c
*_wpftmp.csproj
*- [Bb]ackup ([0-9]).rdl
*- [Bb]ackup ([0-9][0-9]).rdl
*- [Bb]ackup.rdl
*.[Cc]ache
*.[Pp]ublish.xml
*.[Rr]e[Ss]harper
*.a
*.app
*.appx
*.appxbundle
*.appxupload
*.aps
*.azurePubxml
*.bim_*.settings
*.bim.layout
*.binlog
*.btm.cs
*.btp.cs
*.build.csdef
*.cab
*.cachefile
*.code-workspace
*.coverage
*.coveragexml
*.d
*.dbmdl
*.dbproj.schemaview
*.dll
*.dotCover
*.DotSettings.user
*.dsp
*.dsw
*.dylib
*.e2e
*.exe
*.gch
*.GhostDoc.xml
*.gpState
*.ilk
*.iobj
*.ipdb
*.jfm
*.jmconfig
*.la
*.lai
*.ldf
*.lib
*.lo
*.log
*.mdf
*.meta
*.mm.*
*.mod
*.msi
*.msix
*.msm
*.msp
*.ncb
*.ndf
*.nuget.props
*.nuget.targets
*.nupkg
*.nvuser
*.o
*.obj
*.odx.cs
*.opendb
*.opensdf
*.opt
*.out
*.pch
*.pdb
*.pfx
*.pgc
*.pgd
*.pidb
*.plg
*.psess
*.publishproj
*.publishsettings
*.pubxml
*.pyc
*.rdl.data
*.rptproj.bak
*.rptproj.rsuser
*.rsp
*.rsuser
*.sap
*.sbr
*.scc
*.sdf
*.sln.docstates
*.sln.iml
*.slo
*.smod
*.snupkg
*.so
*.suo
*.svclog
*.tlb
*.tlh
*.tli
*.tlog
*.tmp
*.tmp_proj
*.tss
*.user
*.userosscache
*.userprefs
*.vbp
*.vbw
*.VC.db
*.VC.VC.opendb
*.VisualState.xml
*.vsp
*.vspscc
*.vspx
*.vssscc
*.xsd.cs
**/[Pp]ackages/*
**/*.DesktopClient/GeneratedArtifacts
**/*.DesktopClient/ModelManifest.xml
**/*.HTMLClient/GeneratedArtifacts
**/*.Server/GeneratedArtifacts
**/*.Server/ModelManifest.xml
*~
~$*
$tf/
AppPackages/
artifacts/
ASALocalRun/
AutoTest.Net/
Backup*/
BenchmarkDotNet.Artifacts/
bld/
BundleArtifacts/
ClientBin/
cmake_install.cmake
CMakeCache.txt
CMakeFiles
CMakeLists.txt.user
CMakeScripts
CMakeUserPresets.json
compile_commands.json
coverage*.info
coverage*.json
coverage*.xml
csx/
CTestTestfile.cmake
dlldata.c
DocProject/buildhelp/
DocProject/Help/*.hhc
DocProject/Help/*.hhk
DocProject/Help/*.hhp
DocProject/Help/*.HxC
DocProject/Help/*.HxT
DocProject/Help/html
DocProject/Help/Html2
ecf/
FakesAssemblies/
FodyWeavers.xsd
Generated_Code/
Generated\ Files/
healthchecksdb
install_manifest.txt
ipch/
Makefile
MigrationBackup/
mono_crash.*
nCrunchTemp_*
node_modules/
nunit-*.xml
OpenCover/
orleans.codegen.cs
Package.StoreAssociation.xml
paket-files/
project.fragment.lock.json
project.lock.json
publish/
PublishScripts/
rcf/
ScaffoldingReadMe.txt
ServiceFabricBackup/
StyleCopReport.xml
Testing
TestResult.xml
UpgradeLog*.htm
UpgradeLog*.XML
x64/
x86/
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# Distribution / packaging
!dist/.gitkeep

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
.ruff_cache/

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# IDE
.idea/
.vscode/
*.swp
*.swo
*~

# OS
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db

# Project specific
__version__.py
_private

================
File: .pre-commit-config.yaml
================
repos:
  - repo: https://github.com/astral-sh/ruff-pre-commit
    rev: v0.3.4
    hooks:
      - id: ruff
        args: [--fix]
      - id: ruff-format
        args: [--respect-gitignore]
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.5.0
    hooks:
      - id: trailing-whitespace
      - id: end-of-file-fixer
      - id: check-yaml
      - id: check-toml
      - id: check-added-large-files
      - id: debug-statements
      - id: check-case-conflict
      - id: mixed-line-ending
        args: [--fix=lf]

================
File: cleanup.py
================
LOG_FILE = Path("CLEANUP.txt")
os.chdir(Path(__file__).parent)
def new() -> None:
    if LOG_FILE.exists():
        LOG_FILE.unlink()
def prefix() -> None:
    readme = Path(".cursor/rules/0project.mdc")
    if readme.exists():
        log_message("\n=== PROJECT STATEMENT ===")
        content = readme.read_text()
        log_message(content)
def suffix() -> None:
    todo = Path("TODO.md")
    if todo.exists():
        log_message("\n=== TODO.md ===")
        content = todo.read_text()
def log_message(message: str) -> None:
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with LOG_FILE.open("a") as f:
        f.write(log_line)
def run_command(cmd: list[str], check: bool = True) -> subprocess.CompletedProcess:
        result = subprocess.run(cmd, check=check, capture_output=True, text=True)
            log_message(result.stdout)
        log_message(f"Command failed: {' '.join(cmd)}")
        log_message(f"Error: {e.stderr}")
        return subprocess.CompletedProcess(cmd, 1, "", str(e))
def check_command_exists(cmd: str) -> bool:
        subprocess.run(["which", cmd], check=True, capture_output=True)
class Cleanup:
    def __init__(self) -> None:
        self.workspace = Path.cwd()
    def _print_header(self, message: str) -> None:
        log_message(f"\n=== {message} ===")
    def _check_required_files(self) -> bool:
            if not (self.workspace / file).exists():
                log_message(f"Error: {file} is missing")
    def _generate_tree(self) -> None:
        if not check_command_exists("tree"):
            log_message("Warning: 'tree' command not found. Skipping tree generation.")
            rules_dir = Path(".cursor/rules")
            rules_dir.mkdir(parents=True, exist_ok=True)
            tree_result = run_command(
            with open(rules_dir / "filetree.mdc", "w") as f:
                f.write("---\ndescription: File tree of the project\nglobs: \n---\n")
                f.write(tree_text)
            log_message("\nProject structure:")
            log_message(tree_text)
            log_message(f"Failed to generate tree: {e}")
    def _git_status(self) -> bool:
        result = run_command(["git", "status", "--porcelain"], check=False)
        return bool(result.stdout.strip())
    def _venv(self) -> None:
        log_message("Setting up virtual environment")
            run_command(["uv", "venv"])
            if venv_path.exists():
                os.environ["VIRTUAL_ENV"] = str(self.workspace / ".venv")
                log_message("Virtual environment created and activated")
                log_message("Virtual environment created but activation failed")
            log_message(f"Failed to create virtual environment: {e}")
    def _install(self) -> None:
        log_message("Installing package with all extras")
            self._venv()
            run_command(["uv", "pip", "install", "-e", ".[test,dev]"])
            log_message("Package installed successfully")
            log_message(f"Failed to install package: {e}")
    def _run_checks(self) -> None:
        log_message("Running code quality checks")
            log_message(">>> Running code fixes...")
            run_command(
            log_message(">>>Running type checks...")
            run_command(["python", "-m", "mypy", "src", "tests"], check=False)
            log_message(">>> Running tests...")
            run_command(["python", "-m", "pytest", "tests"], check=False)
            log_message("All checks completed")
            log_message(f"Failed during checks: {e}")
    def status(self) -> None:
        prefix()  # Add README.md content at start
        self._print_header("Current Status")
        self._check_required_files()
        self._generate_tree()
        result = run_command(["git", "status"], check=False)
        self._print_header("Environment Status")
        self._install()
        self._run_checks()
        suffix()  # Add TODO.md content at end
    def venv(self) -> None:
        self._print_header("Virtual Environment Setup")
    def install(self) -> None:
        self._print_header("Package Installation")
    def update(self) -> None:
        self.status()
        if self._git_status():
            log_message("Changes detected in repository")
                run_command(["git", "add", "."])
                run_command(["git", "commit", "-m", commit_msg])
                log_message("Changes committed successfully")
                log_message(f"Failed to commit changes: {e}")
            log_message("No changes to commit")
    def push(self) -> None:
        self._print_header("Pushing Changes")
            run_command(["git", "push"])
            log_message("Changes pushed successfully")
            log_message(f"Failed to push changes: {e}")
def repomix(
            cmd.append("--compress")
            cmd.append("--remove-empty-lines")
            cmd.append("-i")
            cmd.append(ignore_patterns)
        cmd.extend(["-o", output_file])
        run_command(cmd)
        log_message(f"Repository content mixed into {output_file}")
        log_message(f"Failed to mix repository: {e}")
def print_usage() -> None:
    log_message("Usage:")
    log_message("  cleanup.py status   # Show current status and run all checks")
    log_message("  cleanup.py venv     # Create virtual environment")
    log_message("  cleanup.py install  # Install package with all extras")
    log_message("  cleanup.py update   # Update and commit changes")
    log_message("  cleanup.py push     # Push changes to remote")
def main() -> NoReturn:
    new()  # Clear log file
    if len(sys.argv) < 2:
        print_usage()
        sys.exit(1)
    cleanup = Cleanup()
            cleanup.status()
            cleanup.venv()
            cleanup.install()
            cleanup.update()
            cleanup.push()
        log_message(f"Error: {e}")
    repomix()
    main()

================
File: LICENSE
================
MIT License

Copyright (c) 2025 Adam Twardoch

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

================
File: LOG.md
================
---
this_file: LOG.md
---

# Development Log

## Changed

- Updated `catbox.py` to properly implement both `Provider` and `ProviderClient` protocols
  - Fixed method signatures to match protocol requirements
  - Improved error handling and logging
  - Added proper type hints for async/await patterns
  - Simplified code by removing redundant type variables
  - Made `get_credentials` and `get_provider` class methods
  - Fixed `ProviderHelp` type usage

- Updated `bashupload.py` to use shared utilities and follow protocol patterns
  - Removed dependency on `simple.py` base class
  - Properly implemented both `Provider` and `ProviderClient` protocols
  - Added async support with proper type hints
  - Simplified provider help dictionary
  - Improved error handling and logging
  - Added file validation
  - Standardized HTTP response handling
  - Fixed timeout handling in aiohttp client

## In Progress

- Refactoring upload providers to use shared utilities
  - Created `utils.py` with common functionality
  - Updated `catbox.py` and `bashupload.py` as example implementations
  - Need to update remaining providers to follow the same pattern

## Next Steps

1. Update remaining providers to use the shared utilities from `utils.py`
2. Ensure consistent async/await patterns across all providers
3. Add helper functions in `utils.py` for common async patterns if needed
4. Consider adding more utility functions for:
   - Standardized error handling
   - File validation and handling
   - HTTP response processing
   - Credential management
   - Logging and metrics

## Technical Debt

- [ ] Update all provider implementations to match the new protocol type hints
- [ ] Consider adding helper functions in `utils.py` for common async patterns
- [ ] Review and possibly simplify the provider protocols
- [ ] Add more comprehensive error handling in utility functions
- [ ] Consider adding retry mechanisms in utility functions
- [ ] Add tests for utility functions
- [ ] Document best practices for implementing providers

# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Changed
- Updated Provider protocols to better handle async/coroutine types
  - Modified return type of async_upload_file to accept both Awaitable[UploadResult] and Coroutine[Any, Any, UploadResult]
  - Added type variables for covariant return types (T_co, T_ret)
  - This change allows for more flexible async implementations while maintaining type safety
  - Providers can now use either async/await pattern or coroutines without type conflicts

### Technical Debt
- Need to update all provider implementations to match new protocol type hints
- Consider adding helper functions in utils.py to handle common async patterns

================
File: MANIFEST.in
================
# This MANIFEST.in file ensures that JPEG files in the data directory are included in the package distribution.
recursive-include src/twat_fs/data *.jpg

================
File: mypy.ini
================
[mypy]
python_version = 3.12
ignore_missing_imports = True
disallow_untyped_defs = False
warn_return_any = True
warn_unused_configs = True

[mypy-dropbox.*]
ignore_missing_imports = True

[mypy-fire.*]
ignore_missing_imports = True

[mypy-boto3.*]
ignore_missing_imports = True

[mypy-responses.*]
ignore_missing_imports = True

[mypy-twat_fs.upload_providers.*]
disallow_untyped_defs = False

[mypy-tests.*]
disallow_untyped_defs = False

================
File: pyproject.toml
================
# this_file: pyproject.toml
# this_project: twat_fs

[project]
classifiers = [
  'Development Status :: 4 - Beta',
  'Programming Language :: Python',
  'Programming Language :: Python :: 3.10',
  'Programming Language :: Python :: 3.11',
  'Programming Language :: Python :: 3.12',
  'Programming Language :: Python :: Implementation :: CPython',
  'Programming Language :: Python :: Implementation :: PyPy',
]
dependencies = [
  'aiohappyeyeballs>=2.4.6',
  'aiohttp>=3.11.12',
  'aiosignal>=1.3.2',
  'attrs>=25.1.0',
  'fire>=0.6.0',
  'frozenlist>=1.5.0',
  'loguru>=0.7.2',
  'multidict>=6.1.0',
  'propcache>=0.2.1',
  'requests>=2.31.0',
  'tenacity>=8.0.0',
  'twat>=1.8.1',
  'yarl>=1.18.3',
]
description = 'File system utilities for twat with support for multiple upload providers'
dynamic = ['version']
keywords = [
  'file-upload',
  'fal',
  'dropbox',
  's3',
  'twat',
]
license = 'MIT'
name = 'twat-fs'
readme = 'README.md'
requires-python = '>=3.10'

[[project.authors]]
email = 'adam+github@twardoch.com'
name = 'Adam Twardoch'

[project.optional-dependencies]
all = [
  'aiohappyeyeballs>=2.4.6',
  'aiohttp>=3.11.12',
  'aiosignal>=1.3.2',
  'attrs>=25.1.0',
  'boto3>=1.36.22',
  'botocore>=1.36.22',
  'dropbox>=12.0.2',
  'fal-client>=0.5.9',
  'fire>=0.6.0',
  'frozenlist>=1.5.0',
  'loguru>=0.7.2',
  'multidict>=6.1.0',
  'propcache>=0.2.1',
  'requests>=2.31.0',
  'tenacity>=9.0.0',
  'twat>=1.8.1',
  'yarl>=1.18.3',
]
dev = [
  'botocore-stubs<=1.36.22',
  'hatch>=1.14.0',
  'hatchling>=1.27.0',
  'hatch-vcs>=0.4.0',
  'mypy-boto3-s3<=1.36.21',
  'mypy-boto3-sts<=1.36.0',
  'mypy>=1.15.0',
  'pre-commit>=4.1.0',
  'pyupgrade>=3.19.1',
  'ruff>=0.9.6',
  'types-awscrt>=0.23.10',
  'types-boto3>=1.36.22',
  'types-s3transfer>=0.11.2',
  'argparse-types',
  'botocore-types',
  'http-types',
  'json-types',
  'litellm-types',
  'types-aioboto3',
  'types-aiobotocore',
  'types-aiofiles',
  'types-attrs',
  'types-backports',
  'types-beautifulsoup4',
  'types-botocore',
  'types-cachetools',
  'types-jinja2',
  'types-lxml',
  'types-markdown',
  'types-pyyaml',
  'types-regex',
  'types-toml',
  'types-tqdm',
]
dropbox = ['dropbox>=12.00.2']
fal = ['fal-client>=0.5.9']
s3 = [
  'boto3>=1.36.22',
  'botocore>=1.36.22',
]
test = [
  'pytest>=8.3.4',
  'pytest-cov>=6.0.0',
  'pytest-benchmark>=5.1.0',
  'pytest-mock>=3.14.0',
  'pytest-asyncio>=0.25.3',
  'pytest-timeout>=2.3.1',
]

[project.scripts]
twat-fs = 'twat_fs.__main__:main'
[project.entry-points."twat.plugins"]
fs = 'twat_fs'

[project.urls]
Documentation = 'https://github.com/twardoch/twat-fs#readme'
Issues = 'https://github.com/twardoch/twat-fs/issues'
Source = 'https://github.com/twardoch/twat-fs'

[build-system]
build-backend = 'hatchling.build'
requires = [
  'hatchling>=1.27.0',
  'hatch-vcs>=0.4.0',
]
[tool.coverage.paths]
tests = [
  'tests',
  '*/twat-fs/tests',
]
twat_fs = [
  'src/twat_fs',
  '*/twat-fs/src/twat_fs',
]

[tool.coverage.report]
exclude_lines = [
  'no cov',
  'if __name__ == .__main__.:',
  'if TYPE_CHECKING:',
]

[tool.coverage.run]
branch = true
omit = ['src/twat_fs/__about__.py']
parallel = true
source_pkgs = [
  'twat_fs',
  'tests',
]
[tool.hatch.build.hooks.vcs]
version-file = 'src/twat_fs/__version__.py'
[tool.hatch.build.targets.wheel]
include = [
  'src/twat_fs/**/*.py',
  'src/twat/**/*.py',
  'src/twat_fs/py.typed',
]
packages = [
  'src/twat_fs',
  'src/twat',
]
[tool.hatch.envs.default]
dependencies = [
  'pytest>=8.3.4',
  'pytest-cov>=6.0.0',
  'ruff>=0.9.6',
  'mypy>=1.15.0',
]

[tool.hatch.envs.default.scripts]
fix = [
  'ruff check  --fix --unsafe-fixes src/twat_fs tests',
  'ruff format --respect-gitignore src/twat_fs tests',
]
lint = [
  'ruff check src/twat_fs tests',
  'ruff format --respect-gitignore src/twat_fs tests',
]
test = 'pytest {args:tests}'
test-cov = 'pytest --cov-report=term-missing --cov-config=pyproject.toml --cov=src/twat_fs --cov=tests {args:tests}'
type-check = 'mypy src/twat_fs tests'
[[tool.hatch.envs.all.matrix]]
python = [
  '3.10',
  '3.11',
  '3.12',
]

[tool.hatch.envs.lint]
dependencies = [
  'pytest>=8.3.4',
  'pytest-cov>=6.0.0',
  'ruff>=0.9.6',
  'mypy>=1.15.0',
]
detached = true

[tool.hatch.envs.lint.scripts]
all = [
  'style',
  'typing',
]
fmt = [
  'ruff format --respect-gitignore {args:.}',
  'ruff check --fix {args:.}',
]
style = [
  'ruff check {args:.}',
  'ruff format --respect-gitignore {args:.}',
]
typing = 'mypy --install-types --non-interactive {args:src/twat_fs tests}'

[tool.hatch.envs.test]
dependencies = [
  'pytest>=8.3.4',
  'pytest-cov>=6.0.0',
  'boto3>=1.36.22',
  'botocore>=1.36.22',
  'dropbox>=12.0.2',
  'fal-client>=0.5.9',
]
python = '3.10'

[tool.hatch.envs.test.scripts]
bench = 'python -m pytest -v -p no:briefcase tests/test_benchmark.py --benchmark-only'
bench-save = 'python -m pytest -v -p no:briefcase tests/test_benchmark.py --benchmark-only --benchmark-json=benchmark/results.json'
test = 'python -m pytest -n auto -p no:briefcase {args:tests}'
test-cov = 'python -m pytest -n auto -p no:briefcase --cov-report=term-missing --cov-config=pyproject.toml --cov=src/twat_fs --cov=tests {args:tests}'

[tool.hatch.version]
source = 'vcs'

[tool.hatch.version.raw-options]
version_scheme = 'post-release'

[tool.mypy]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
no_implicit_optional = true
python_version = '3.10'
warn_no_return = true
warn_redundant_casts = true
warn_return_any = true
warn_unreachable = true
warn_unused_configs = true
warn_unused_ignores = true

[tool.ruff]
line-length = 88
target-version = 'py310'

[tool.ruff.lint]
extend-select = [
  'A',
  'ARG',
  'B',
  'C',
  'DTZ',
  'E',
  'EM',
  'F',
  'FBT',
  'I',
  'ICN',
  'ISC',
  'N',
  'PLC',
  'PLE',
  'PLR',
  'PLW',
  'Q',
  'RUF',
  'S',
  'T',
  'TID',
  'UP',
  'W',
  'YTT',
]
ignore = [
  'ARG001',
  'E501',
  'I001',
  'RUF001',
  'PLR2004',
  'EXE003',
  'ISC001',
]

[tool.ruff.per-file-ignores]
"tests/*" = ['S101']
[tool.pytest.ini_options]
addopts = '-v --durations=10 -p no:briefcase'
asyncio_mode = 'auto'
asyncio_default_fixture_loop_scope = 'function'
console_output_style = 'progress'
filterwarnings = [
  'ignore::DeprecationWarning',
  'ignore::UserWarning',
  'ignore:Unable to refresh access token without refresh token and app key:UserWarning',
]
log_cli = true
log_cli_level = 'INFO'
markers = [
  '''benchmark: marks tests as benchmarks (select with '-m benchmark')''',
  'unit: mark a test as a unit test',
  'integration: mark a test as an integration test',
  'permutation: tests for permutation functionality',
  'parameter: tests for parameter parsing',
  'prompt: tests for prompt parsing',
]
norecursedirs = [
  '.*',
  'build',
  'dist',
  'venv',
  '__pycache__',
  '*.egg-info',
  '_private',
]
python_classes = ['Test*']
python_files = ['test_*.py']
python_functions = ['test_*']
testpaths = ['tests']

[tool.pytest-benchmark]
compare = [
  'min',
  'max',
  'mean',
  'stddev',
  'median',
  'iqr',
  'ops',
  'rounds',
]
histogram = true
min_rounds = 100
min_time = 0.1
save-data = true
storage = 'file'

================
File: README.md
================
# twat-fs

File system utilities for twat, focusing on robust and extensible file upload capabilities with multiple provider support.

## Rationale

`twat-fs` provides a unified interface for uploading files to various storage providers while addressing common challenges:

* **Provider Flexibility**: Seamlessly switch between storage providers without code changes
* **Smart Fallback**: Intelligent retry and fallback between providers:
  * One retry with exponential backoff for temporary failures
  * Automatic fallback to next provider for permanent failures
  * Clear distinction between retryable and non-retryable errors
* **URL Validation**: Ensures returned URLs are accessible before returning them
* **Progressive Enhancement**: Start simple with zero configuration (simple providers), scale up to advanced providers (S3, Dropbox) as needed
* **Developer Experience**: Clear interfaces, comprehensive type hints, and runtime checks
* **Extensibility**: Well-defined provider protocol for adding new storage backends

## Quick Start

### Installation

Basic installation with simple providers:

```bash
uv pip install twat-fs
```

Install with all providers and development tools:

```bash
uv pip install 'twat-fs[all,dev]'
```

### Basic Usage

```python
from twat_fs import upload_file

# Simple upload (uses catbox.moe by default)
url = upload_file("path/to/file.txt")

# Specify provider with fallback
url = upload_file("path/to/file.txt", provider=["s3", "dropbox", "catbox"])

# Handle provider-specific errors
from twat_fs.upload_providers.core import RetryableError, NonRetryableError

try:
    url = upload_file("file.txt", provider="s3")
except RetryableError as e:
    print(f"Temporary error with {e.provider}: {e}")
except NonRetryableError as e:
    print(f"Permanent error with {e.provider}: {e}")
```

### Command Line Interface

```bash
# Simple upload
python -m twat_fs upload_file path/to/file.txt

# Specify provider with fallback
python -m twat_fs upload_file path/to/file.txt --provider s3,dropbox,catbox

# Disable fallback (fail immediately if provider fails)
python -m twat_fs upload_file path/to/file.txt --provider s3 --fragile

# Check provider setup
python -m twat_fs setup provider s3
python -m twat_fs setup all
```

## Provider Configuration

### Provider Fallback System

The package implements a robust provider fallback system:

1. **Circular Fallback**: When using multiple providers, if a provider fails, the system will:
   * Try the next provider in the list
   * If all remaining providers fail, start over from the beginning of the full provider list
   * Continue until all providers have been tried once
   * Each provider is only tried once to avoid infinite loops

2. **Fragile Mode**: For cases where fallback is not desired:
   * Use the `--fragile` flag in CLI: `--fragile`
   * In code: `upload_file(..., fragile=True)`
   * System will fail immediately if the requested provider fails
   * No fallback attempts will be made

Example fallback scenarios:

```python
# Full circular fallback (if E fails, tries F, G, A, B, C, D)
url = upload_file("file.txt", provider="E")

# Fragile mode (fails immediately if E fails)
url = upload_file("file.txt", provider="E", fragile=True)

# Custom provider list with circular fallback
# If C fails, tries A, then B
url = upload_file("file.txt", provider=["C", "A", "B"])
```

### Simple Providers (No Configuration Required)

The following providers work out of the box with no configuration:

* **catbox.moe**: General file uploads (default)
* **litterbox.catbox.moe**: Temporary file uploads with expiration
* **www0x0.st**: General file uploads
* **uguu.se**: Temporary file uploads
* **bashupload.com**: General file uploads
* **filebin.net**: Temporary file uploads (6-day expiration)
* **pixeldrain.com**: General file uploads

### Dropbox

```bash
export DROPBOX_ACCESS_TOKEN="your_token_here"
# Optional OAuth2 configuration
export DROPBOX_REFRESH_TOKEN="refresh_token"
export DROPBOX_APP_KEY="app_key"
export DROPBOX_APP_SECRET="app_secret"
```

### AWS S3

```bash
# Required
export AWS_S3_BUCKET="your_bucket"
export AWS_DEFAULT_REGION="us-east-1"

# Authentication (choose one)
export AWS_ACCESS_KEY_ID="key_id"
export AWS_SECRET_ACCESS_KEY="secret_key"
# Or use AWS CLI: aws configure
# Or use IAM roles in AWS infrastructure

# Optional
export AWS_ENDPOINT_URL="custom_endpoint"  # For S3-compatible services
export AWS_S3_PATH_STYLE="true"  # For path-style endpoints
export AWS_ROLE_ARN="role_to_assume"
```

### FAL.ai

```bash
export FAL_KEY="your_key_here"
```

## Architecture

### Provider System

The package uses a provider-based architecture with these key components:

1. **Provider Registry**: Central registry of available providers
   * Maintains provider preference order
   * Handles lazy loading of provider modules
   * Provides runtime protocol checking
   * Manages provider fallback chain

2. **Provider Protocol**: Formal interface that all providers must implement
   * Credentials management
   * Client initialization
   * File upload functionality
   * Help and setup information
   * Error classification (retryable vs. non-retryable)

3. **Provider Client**: The actual implementation that handles uploads
   * Provider-specific upload logic
   * Error handling and retries
   * URL validation
   * Progress tracking (where supported)

4. **Error Handling**: Structured error hierarchy
   * RetryableError: Temporary failures (rate limits, timeouts)
   * NonRetryableError: Permanent failures (auth, invalid files)
   * Automatic retry with exponential backoff
   * Provider fallback for permanent failures

### Type System

Strong typing throughout with runtime checks:

* Type hints for all public APIs
* Runtime protocol verification
* Custom types for provider-specific data
* Error type hierarchy

## Implementing a New Provider

To add a new storage provider, create a module in `twat_fs/upload_providers/` that implements the Provider protocol:

```python
from pathlib import Path
from typing import Any, TypedDict
from twat_fs.upload_providers import ProviderClient, Provider

# Provider-specific help messages
PROVIDER_HELP = {
    "setup": """Setup instructions for users...""",
    "deps": """Additional dependencies needed..."""
}

def get_credentials() -> dict[str, Any] | None:
    """
    Get provider credentials from environment.
    Return None if not configured.
    """
    # Implement credential checking
    ...

def get_provider() -> ProviderClient | None:
    """
    Initialize and return the provider client.
    Only import provider-specific dependencies here.
    """
    creds = get_credentials()
    if not creds:
        return None
    
    try:
        # Initialize your provider client
        client = YourProviderClient(creds)
        return client
    except Exception:
        return None

def upload_file(local_path: str | Path, remote_path: str | Path | None = None) -> str:
    """
    Upload a file and return its public URL.
    This is a convenience wrapper around get_provider().
    """
    client = get_provider()
    if not client:
        raise ValueError("Provider not configured")
    return client.upload_file(local_path, remote_path)

# Your provider client implementation
class YourProviderClient:
    def upload_file(
        self, 
        local_path: str | Path, 
        remote_path: str | Path | None = None
    ) -> str:
        """Implement the actual upload logic."""
        ...
```

Then add your provider to `PROVIDERS_PREFERENCE` in `upload_providers/__init__.py`.

## Development

### Setup Environment

```bash
# Install development tools
uv pip install uv

# Create and activate environment
uv venv
source .venv/bin/activate

# Install in development mode with all extras
uv pip install -e '.[dev,all,test]'
```

### Code Quality

```bash
# Format code
python -m ruff format src tests
python -m ruff check --fix --unsafe-fixes src tests

# Run type checks
python -m mypy src tests

# Run tests
python -m pytest tests
python -m pytest --cov=src/twat_fs tests  # with coverage

# Quick development cycle
./cleanup.py install  # Set up environment
./cleanup.py status  # Run all checks
```

### Publish

Make sure to have in your env:

```bash
export UV_PUBLISH_TOKEN="${PYPI_TOKEN}"
```

Build and publish:

```bash
VER="v1.7.9" && echo "$VER" > VERSION.txt && git commit -am "$VER" && git tag "$VER"
uv build && uv publish
```

### Testing

The test suite includes:

* Unit tests for each provider
* Integration tests with real services
* Performance tests for large files
* Error condition tests
* Type checking tests

When adding a new provider:

1. Add unit tests in `tests/test_providers/`
2. Add integration tests in `tests/test_integration.py`
3. Add performance tests if relevant
4. Update provider discovery tests

## Error Handling & Troubleshooting

### Error Types

The package uses a structured error hierarchy for better error handling:

```python
from twat_fs.upload_providers.core import (
    UploadError,              # Base class for all upload errors
    RetryableError,           # Temporary failures that should be retried
    NonRetryableError,        # Permanent failures that trigger fallback
)
```

### Common Issues

1. **Temporary Failures (RetryableError)**
   * Rate limiting
   * Network timeouts
   * Server errors (503, 504)
   * Connection resets
   ```python
   try:
       url = upload_file("file.txt")
   except RetryableError as e:
       print(f"Temporary error with {e.provider}: {e}")
       # Will be retried automatically with exponential backoff
   ```

2. **Permanent Failures (NonRetryableError)**
   * Authentication failures
   * Invalid files
   * Missing permissions
   * Provider not available
   ```python
   try:
       url = upload_file("file.txt", provider=["s3", "dropbox"])
   except NonRetryableError as e:
       print(f"All providers failed. Last error from {e.provider}: {e}")
   ```

3. **URL Validation**
   * All returned URLs are validated with HEAD request
   * Follows redirects
   * Verifies accessibility
   * Retries on temporary failures
   ```python
   # URL is guaranteed to be accessible when returned
   url = upload_file("file.txt")
   ```

### Provider Status Checking

Use the setup commands to diagnose provider issues:

```bash
# Check specific provider
python -m twat_fs setup provider s3

# Check all providers
python -m twat_fs setup all
```

### Logging

The package uses `loguru` for structured logging:

```python
from loguru import logger

# Set log level
logger.level("DEBUG")

# Add file handler
logger.add("twat_fs.log", rotation="1 day")

# Log format includes provider info
logger.add(
    sys.stderr,
    format="{time} {level} [{extra[provider]}] {message}"
)
```

### Debugging Provider Issues

When implementing a new provider:

1. Enable debug logging:

```python
import logging
logging.getLogger("twat_fs").setLevel(logging.DEBUG)
```

2. Use the provider test helper:

```python
from twat_fs.testing import ProviderTestHelper

helper = ProviderTestHelper("your_provider")
helper.test_provider_implementation()  # Checks protocol compliance
helper.test_provider_functionality()   # Tests basic operations
```

3. Check provider initialization:

```python
from twat_fs.upload_providers import get_provider_module

provider = get_provider_module("your_provider")
print(provider.get_credentials())  # Check credential loading
print(provider.get_provider())     # Check client initialization
```

## License

MIT License

.

## extra

```bash
for PROVIDER in $(twat fs upload_provider list 2>/dev/null); do URL="$(twat fs upload "./src/twat_fs/data/test.jpg" --provider "$PROVIDER")"; echo "[$PROVIDER]($URL)"; done
```

```
Error: Upload failed: Failed to upload with catbox: Unexpected error: URL validation failed (status 503)
[catbox]()
[litterbox](https://litter.catbox.moe/8a6jf0.jpg)
[fal](https://v3.fal.media/files/monkey/Kd6SwMGEIbxMIFPihlFQL_test.jpg)
[bashupload](https://bashupload.com/TTHlX/test.jpg?download=1)
[uguu](https://d.uguu.se/RrhFSqLP.jpg)
[www0x0](https://0x0.st/8qUT.jpg)
[filebin](https://filebin.net/twat-fs-1739859030-enq2xe/test.jpg)
```

================
File: TODO.md
================
---
this_file: TODO.md
---

# TODO

- [x] Create utils.py to centralize common functionality for upload providers
  
  A `utils.py` file has been created in the `upload_providers` folder with several helper functions:
  - `create_provider_help` for standardizing provider help dictionaries
  - `safe_file_handle` for file operations
  - `validate_file` for file validation
  - `handle_http_response` for HTTP response handling
  - `get_env_credentials` for credential management
  - `create_provider_instance` for provider initialization
  - `standard_upload_wrapper` for consistent upload handling
  - `log_upload_attempt` for consistent logging

- [ ] Refactor provider modules to use utils.py consistently
  
  Most providers still use their own implementations for common operations. The priority is to:
  1. Replace duplicated code with calls to utils.py functions
  2. Ensure consistent error handling across providers
  3. Standardize method signatures and patterns

- [ ] Implement a factory pattern for provider instantiation
  
  Create a provider factory that simplifies the creation of providers and reduces code duplication in `get_provider()` methods.

- [ ] Standardize async/sync conversion patterns
  
  Several providers implement both synchronous and asynchronous upload methods with nearly identical conversion logic. Standardize this pattern using the utilities in utils.py.

- [ ] Refine HTTP response handling across providers
  
  While `handle_http_response()` exists in utils.py, ensure all providers use it consistently for handling various HTTP status codes, especially for error conditions like rate limits (429) and other non-200 responses.

- [ ] Create provider base classes to reduce inheritance boilerplate
  
  Implement base classes that providers can inherit from to reduce duplicate code:
  - A base class for simple providers with no credentials
  - A base class for async-capable providers
  - A base class for providers requiring credentials

- [ ] Add comprehensive type annotations for async operations
  
  With the protocol updates to support both `Awaitable[UploadResult]` and `Coroutine[Any, Any, UploadResult]`, ensure consistent type handling in:
  - Async/await conversions
  - Async decorators with proper type hints
  - Async context managers and resource cleanup

- [ ] Write unit tests for utils.py functions
  
  Ensure the centralized utilities are thoroughly tested to prevent regressions when refactoring providers.

- [ ] Document best practices for creating new providers
  
  Create clear documentation for adding new providers, showing how to leverage the shared utilities and follow the established patterns.

================
File: update_providers.py
================
def update_provider_file(file_path: Path) -> None:
    with open(file_path) as f:
        content = f.read()
    content = content.replace("SimpleProviderBase", "BaseProvider")
    content = content.replace(
    def replace_upload_file(match):
        signature = match.group(1)
    content = re.sub(
    def replace_upload_file_body(match):
        match.group(1)
    with open(file_path, "w") as f:
        f.write(content)
def main():
    base_path = Path("src/twat_fs/upload_providers")
        update_provider_file(file_path)
    main()

================
File: VERSION.txt
================
v2.0.2



================================================================
End of Codebase
================================================================
