#!/usr/bin/env -S uv run --script
# [MISE] description="Automate the fork-backed release flow and mirror assets to upstream"
# [USAGE] about "Automate the fork-backed release flow for plexos2duckdb"
# /// script
# requires-python = ">=3.11"
# ///

from __future__ import annotations

import argparse
import datetime as dt
import json
import os
from pathlib import Path
import re
import shlex
import subprocess
import sys
import time
from typing import Any
from urllib.parse import urlparse

DEFAULT_UPSTREAM_REPO = "epri-dev/plexos2duckdb"
DEFAULT_WORKFLOW = "cd.yml"
DEFAULT_DOWNLOAD_DIR_TEMPLATE = "dist/releases/{tag}"
RUN_LOOKUP_TIMEOUT_SECONDS = 90
RELEASE_ASSET_TIMEOUT_SECONDS = 120
POLL_INTERVAL_SECONDS = 3


class CommandError(RuntimeError):
    """Raised when a required external command fails."""


def main() -> int:
    parser = build_parser()
    args = parser.parse_args()

    try:
        args.func(args)
    except CommandError as exc:
        print(f"Error: {exc}", file=sys.stderr)
        return 1
    except Exception as exc:
        print(
            f"Unexpected error: {exc.__class__.__name__}: {exc}",
            file=sys.stderr,
        )
        return 1

    return 0


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description="Automate the fork-backed release flow for plexos2duckdb.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=(
            "Environment variables:\n"
            "  PLEXOS2DUCKDB_FORK_REPO            Fork repo in OWNER/REPO format.\n"
            "  PLEXOS2DUCKDB_UPSTREAM_REPO        Upstream repo. Default: epri-dev/plexos2duckdb\n"
            "  PLEXOS2DUCKDB_RELEASE_BRANCH       Branch to sync and tag. Default: local origin HEAD or main.\n"
            "  PLEXOS2DUCKDB_RELEASE_WORKFLOW     Workflow file name. Default: cd.yml\n"
            "  PLEXOS2DUCKDB_RELEASE_WORKFLOW_REF Ref containing the workflow file. Default: release branch.\n"
            "  PLEXOS2DUCKDB_RELEASE_DOWNLOAD_DIR Download dir template. Default: dist/releases/{tag}\n\n"
            "Examples:\n"
            "  mise run plexos2duckdb:release-via-fork -- mirror v0.1.0-beta.8 --fork-repo YOURUSER/plexos2duckdb\n"
            "  mise run plexos2duckdb:release-build-on-fork -- v0.1.0-beta.8 --fork-repo YOURUSER/plexos2duckdb\n"
            "  mise run plexos2duckdb:release-upload-upstream-assets -- v0.1.0-beta.8 --fork-repo YOURUSER/plexos2duckdb\n"
        ),
    )

    subparsers = parser.add_subparsers(dest="command", required=True)

    sync_parser = subparsers.add_parser(
        "sync-fork",
        help="Sync the fork branch from upstream.",
    )
    add_repo_options(sync_parser, include_tag=False)
    sync_parser.add_argument(
        "--force-sync",
        action="store_true",
        help="Allow gh repo sync to hard-reset the fork branch to upstream.",
    )
    sync_parser.set_defaults(func=cmd_sync_fork)

    build_parser_cmd = subparsers.add_parser(
        "build-on-fork",
        help="Create or reuse the fork release, dispatch the workflow, and wait for assets.",
    )
    add_repo_options(build_parser_cmd, include_tag=True)
    build_parser_cmd.add_argument(
        "--allow-existing-fork-assets",
        action="store_true",
        help="Reuse an existing fork release with assets instead of dispatching the workflow again.",
    )
    build_parser_cmd.add_argument(
        "--force-sync",
        action="store_true",
        help="Allow gh repo sync to hard-reset the fork branch to upstream.",
    )
    build_parser_cmd.add_argument(
        "--skip-sync",
        action="store_true",
        help="Skip syncing the fork branch before dispatching the workflow.",
    )
    build_parser_cmd.set_defaults(func=cmd_build_on_fork)

    download_parser = subparsers.add_parser(
        "download-fork-assets",
        help="Download release assets from the fork release into the local dist directory.",
    )
    add_repo_options(download_parser, include_tag=True)
    download_parser.add_argument(
        "--download-dir",
        help=(
            "Download directory or template. "
            "Default: dist/releases/{tag} or $PLEXOS2DUCKDB_RELEASE_DOWNLOAD_DIR."
        ),
    )
    download_parser.add_argument(
        "--clobber",
        action="store_true",
        help="Overwrite existing local asset files while downloading.",
    )
    download_parser.set_defaults(func=cmd_download_fork_assets)

    upload_parser = subparsers.add_parser(
        "upload-upstream-assets",
        help="Create the upstream release if needed and upload downloaded assets to it.",
    )
    add_repo_options(upload_parser, include_tag=True)
    upload_parser.add_argument(
        "--download-dir",
        help=(
            "Directory or template containing downloaded assets. "
            "Default: dist/releases/{tag} or $PLEXOS2DUCKDB_RELEASE_DOWNLOAD_DIR."
        ),
    )
    upload_parser.add_argument(
        "--clobber",
        action="store_true",
        help="Replace upstream assets with the same names before re-uploading them.",
    )
    upload_parser.set_defaults(func=cmd_upload_upstream_assets)

    mirror_parser = subparsers.add_parser(
        "mirror",
        help="Sync the fork, build release assets on it, download them, and upload them upstream.",
    )
    add_repo_options(mirror_parser, include_tag=True)
    mirror_parser.add_argument(
        "--download-dir",
        help=(
            "Download directory or template. "
            "Default: dist/releases/{tag} or $PLEXOS2DUCKDB_RELEASE_DOWNLOAD_DIR."
        ),
    )
    mirror_parser.add_argument(
        "--allow-existing-fork-assets",
        action="store_true",
        help="Reuse an existing fork release with assets instead of dispatching the workflow again.",
    )
    mirror_parser.add_argument(
        "--force-sync",
        action="store_true",
        help="Allow gh repo sync to hard-reset the fork branch to upstream.",
    )
    mirror_parser.add_argument(
        "--clobber",
        action="store_true",
        help="Overwrite downloaded local assets and matching upstream release assets.",
    )
    mirror_parser.set_defaults(func=cmd_mirror)

    return parser


def add_repo_options(parser: argparse.ArgumentParser, *, include_tag: bool) -> None:
    if include_tag:
        parser.add_argument("tag", help="Release tag to build and mirror, for example v0.1.0-beta.8.")
    parser.add_argument(
        "--fork-repo",
        default=os.environ.get("PLEXOS2DUCKDB_FORK_REPO"),
        help="Fork repo in OWNER/REPO format. Required unless PLEXOS2DUCKDB_FORK_REPO is set.",
    )
    parser.add_argument(
        "--upstream-repo",
        default=os.environ.get("PLEXOS2DUCKDB_UPSTREAM_REPO") or infer_upstream_repo(),
        help="Upstream repo in OWNER/REPO format.",
    )
    default_branch = os.environ.get("PLEXOS2DUCKDB_RELEASE_BRANCH") or infer_default_branch()
    parser.add_argument(
        "--branch",
        default=default_branch,
        help="Branch to sync from upstream and target when creating releases.",
    )
    parser.add_argument(
        "--workflow",
        default=os.environ.get("PLEXOS2DUCKDB_RELEASE_WORKFLOW", DEFAULT_WORKFLOW),
        help="Workflow file name to dispatch on the fork.",
    )
    parser.add_argument(
        "--workflow-ref",
        default=os.environ.get("PLEXOS2DUCKDB_RELEASE_WORKFLOW_REF"),
        help="Branch or tag containing the workflow file. Defaults to --branch.",
    )


def cmd_sync_fork(args: argparse.Namespace) -> None:
    ensure_gh_authentication()
    validate_fork_repo(args.fork_repo)
    sync_fork_branch(
        fork_repo=args.fork_repo,
        upstream_repo=args.upstream_repo,
        branch=args.branch,
        force_sync=args.force_sync,
    )


def cmd_build_on_fork(args: argparse.Namespace) -> None:
    ensure_gh_authentication()
    validate_fork_repo(args.fork_repo)
    build_on_fork(args, sync_first=not args.skip_sync)


def cmd_download_fork_assets(args: argparse.Namespace) -> None:
    ensure_gh_authentication()
    validate_fork_repo(args.fork_repo)
    download_fork_assets(args)


def cmd_upload_upstream_assets(args: argparse.Namespace) -> None:
    ensure_gh_authentication()
    validate_fork_repo(args.fork_repo)
    upload_upstream_assets(args)


def cmd_mirror(args: argparse.Namespace) -> None:
    ensure_gh_authentication()
    validate_fork_repo(args.fork_repo)
    build_on_fork(args, sync_first=True)
    download_fork_assets(args)
    upload_upstream_assets(args)


def validate_fork_repo(fork_repo: str | None) -> None:
    if fork_repo:
        return
    raise CommandError(
        "the fork repo is required; pass --fork-repo OWNER/REPO or set "
        "PLEXOS2DUCKDB_FORK_REPO"
    )


def infer_upstream_repo() -> str:
    origin_url = run_command(
        ["git", "remote", "get-url", "origin"],
        capture=True,
        check=False,
        announce=False,
    )
    if origin_url.returncode != 0:
        return DEFAULT_UPSTREAM_REPO

    origin_value = (origin_url.stdout or "").strip()
    match = re.search(
        r"github\.com[:/](?P<owner>[^/]+)/(?P<repo>[^/.]+?)(?:\.git)?$",
        origin_value,
    )
    if match:
        return f"{match.group('owner')}/{match.group('repo')}"
    return DEFAULT_UPSTREAM_REPO


def get_origin_url() -> str | None:
    result = run_command(
        ["git", "remote", "get-url", "origin"],
        capture=True,
        check=False,
        announce=False,
    )
    if result.returncode != 0:
        return None

    origin_value = (result.stdout or "").strip()
    if not origin_value:
        return None
    return origin_value


def infer_repo_git_url(*, repo: str, fallback_url: str) -> str:
    if "://" not in fallback_url:
        suffix = ".git" if fallback_url.endswith(".git") else ""
        user_host, _, _ = fallback_url.partition(":")
        return f"{user_host}:{repo}{suffix}"

    parsed = urlparse(fallback_url)
    if parsed.scheme and parsed.netloc:
        suffix = ".git" if parsed.path.endswith(".git") else ""
        return f"{parsed.scheme}://{parsed.netloc}/{repo}{suffix}"

    return f"git@github.com:{repo}.git"


def infer_default_branch() -> str:
    command = run_command(
        ["git", "symbolic-ref", "refs/remotes/origin/HEAD"],
        capture=True,
        check=False,
        announce=False,
    )
    if command.returncode == 0:
        branch_ref = command.stdout.strip()
        if branch_ref:
            return branch_ref.rsplit("/", maxsplit=1)[-1]
    return "main"


def ensure_gh_authentication() -> None:
    result = run_command(["gh", "auth", "status"], capture=True, check=False)
    if result.returncode == 0:
        return
    raise CommandError(
        "gh authentication is required. Run `gh auth login -h github.com` or "
        "export a valid GH_TOKEN/GITHUB_TOKEN before running release tasks.\n"
        f"{result.stderr.strip() or result.stdout.strip()}"
    )


def sync_fork_branch(
    *, fork_repo: str, upstream_repo: str, branch: str, force_sync: bool
) -> None:
    cmd = [
        "gh",
        "repo",
        "sync",
        fork_repo,
        "--source",
        upstream_repo,
        "--branch",
        branch,
    ]
    if force_sync:
        cmd.append("--force")
    result = run_command(cmd, capture=True, check=False)
    combined_output = combine_output(result)
    if result.returncode == 0:
        if combined_output:
            print(combined_output, flush=True)
        return

    if is_merge_upstream_not_supported(combined_output):
        print(
            "GitHub's merge-upstream API is unavailable for this repo; "
            "falling back to git fetch/push sync.",
            flush=True,
        )
        sync_fork_branch_with_git(
            fork_repo=fork_repo,
            upstream_repo=upstream_repo,
            branch=branch,
            force_sync=force_sync,
        )
        return

    raise CommandError(command_failure_message(cmd, result))


def is_merge_upstream_not_supported(output: str) -> bool:
    lowered = output.lower()
    return "merge-upstream" in lowered and "404" in lowered


def sync_fork_branch_with_git(
    *, fork_repo: str, upstream_repo: str, branch: str, force_sync: bool
) -> None:
    source_ref = f"refs/remotes/origin/{branch}"
    run_command(["git", "fetch", "origin", f"refs/heads/{branch}:{source_ref}"])

    origin_url = get_origin_url() or f"git@github.com:{upstream_repo}.git"
    fork_url = infer_repo_git_url(repo=fork_repo, fallback_url=origin_url)

    cmd = ["git", "push"]
    if force_sync:
        cmd.append("--force")
    cmd.extend([fork_url, f"{source_ref}:refs/heads/{branch}"])
    run_command(cmd)
    print(
        f"Synchronized {fork_repo}:{branch} from {upstream_repo}:{branch} via git push.",
        flush=True,
    )


def build_on_fork(args: argparse.Namespace, *, sync_first: bool) -> dict[str, Any]:
    if sync_first:
        sync_fork_branch(
            fork_repo=args.fork_repo,
            upstream_repo=args.upstream_repo,
            branch=args.branch,
            force_sync=args.force_sync,
        )

    release = ensure_fork_release(
        fork_repo=args.fork_repo,
        tag=args.tag,
        branch=args.branch,
    )

    asset_count = len(release.get("assets", []))
    if asset_count:
        if args.allow_existing_fork_assets:
            print(
                f"Fork release {args.tag} already has {asset_count} asset(s); "
                "reusing them without dispatching the workflow.",
                flush=True,
            )
            return release
        raise CommandError(
            f"fork release {args.fork_repo}@{args.tag} already has {asset_count} asset(s). "
            "Use --allow-existing-fork-assets to reuse them, or clear the fork release assets first."
        )

    run_id = dispatch_workflow(
        fork_repo=args.fork_repo,
        workflow=args.workflow,
        workflow_ref=args.workflow_ref or args.branch,
        tag=args.tag,
    )
    watch_workflow_run(fork_repo=args.fork_repo, run_id=run_id)
    return wait_for_release_assets(repo=args.fork_repo, tag=args.tag)


def ensure_fork_release(*, fork_repo: str, tag: str, branch: str) -> dict[str, Any]:
    release = get_release(repo=fork_repo, tag=tag)
    if release is not None:
        return release

    create_release(
        repo=fork_repo,
        tag=tag,
        target=branch,
        title=tag,
        notes="",
        prerelease=False,
    )
    release = get_release(repo=fork_repo, tag=tag)
    if release is None:
        raise CommandError(f"expected fork release {fork_repo}@{tag} to exist after creation")
    return release


def dispatch_workflow(*, fork_repo: str, workflow: str, workflow_ref: str, tag: str) -> int:
    started_at = dt.datetime.now(dt.UTC)
    result = run_command(
        [
            "gh",
            "workflow",
            "run",
            workflow,
            "--repo",
            fork_repo,
            "--ref",
            workflow_ref,
            "--raw-field",
            f"tag={tag}",
        ],
        capture=True,
    )
    combined_output = "\n".join(
        part for part in (result.stdout.strip(), result.stderr.strip()) if part
    )
    if combined_output:
        print(combined_output, flush=True)

    run_id = parse_run_id_from_text(combined_output)
    if run_id is not None:
        return run_id

    return wait_for_recent_workflow_run(
        fork_repo=fork_repo,
        workflow=workflow,
        workflow_ref=workflow_ref,
        started_at=started_at,
    )


def parse_run_id_from_text(text: str) -> int | None:
    match = re.search(r"/actions/runs/(\d+)", text)
    if match:
        return int(match.group(1))
    return None


def wait_for_recent_workflow_run(
    *,
    fork_repo: str,
    workflow: str,
    workflow_ref: str,
    started_at: dt.datetime,
) -> int:
    started_cutoff = started_at - dt.timedelta(seconds=5)
    deadline = time.monotonic() + RUN_LOOKUP_TIMEOUT_SECONDS

    while time.monotonic() < deadline:
        runs = gh_json(
            [
                "run",
                "list",
                "--repo",
                fork_repo,
                "--workflow",
                workflow,
                "--event",
                "workflow_dispatch",
                "--json",
                "databaseId,createdAt,event,headBranch,status,url,workflowName",
                "--limit",
                "20",
            ]
        )
        for run in runs:
            created_at = parse_github_timestamp(run["createdAt"])
            if created_at < started_cutoff:
                continue
            if run.get("headBranch") != workflow_ref:
                continue
            return int(run["databaseId"])
        time.sleep(POLL_INTERVAL_SECONDS)

    raise CommandError(
        f"unable to find the workflow run for {fork_repo} after dispatching {workflow}"
    )


def watch_workflow_run(*, fork_repo: str, run_id: int) -> None:
    run_command(
        [
            "gh",
            "run",
            "watch",
            str(run_id),
            "--repo",
            fork_repo,
            "--compact",
            "--exit-status",
        ]
    )


def wait_for_release_assets(*, repo: str, tag: str) -> dict[str, Any]:
    deadline = time.monotonic() + RELEASE_ASSET_TIMEOUT_SECONDS
    last_release: dict[str, Any] | None = None

    while time.monotonic() < deadline:
        last_release = get_release(repo=repo, tag=tag)
        if last_release and last_release.get("assets"):
            asset_count = len(last_release["assets"])
            print(f"Release {repo}@{tag} has {asset_count} asset(s).", flush=True)
            return last_release
        time.sleep(POLL_INTERVAL_SECONDS)

    raise CommandError(
        f"release {repo}@{tag} did not expose assets within "
        f"{RELEASE_ASSET_TIMEOUT_SECONDS} seconds"
    )


def download_fork_assets(args: argparse.Namespace) -> None:
    release = wait_for_release_assets(repo=args.fork_repo, tag=args.tag)
    asset_count = len(release["assets"])
    download_dir = resolve_download_dir(args.tag, getattr(args, "download_dir", None))
    download_dir.mkdir(parents=True, exist_ok=True)

    cmd = [
        "gh",
        "release",
        "download",
        args.tag,
        "--repo",
        args.fork_repo,
        "--dir",
        str(download_dir),
    ]
    if getattr(args, "clobber", False):
        cmd.append("--clobber")
    else:
        cmd.append("--skip-existing")
    run_command(cmd)
    print(
        f"Downloaded {asset_count} asset(s) from {args.fork_repo}@{args.tag} into {download_dir}.",
        flush=True,
    )


def upload_upstream_assets(args: argparse.Namespace) -> None:
    download_dir = resolve_download_dir(args.tag, getattr(args, "download_dir", None))
    asset_files = sorted(path for path in download_dir.iterdir() if path.is_file()) if download_dir.is_dir() else []
    if not asset_files:
        raise CommandError(
            f"no files were found in {download_dir}; run download-fork-assets first"
        )

    fork_release = wait_for_release_assets(repo=args.fork_repo, tag=args.tag)
    ensure_upstream_release(
        upstream_repo=args.upstream_repo,
        tag=args.tag,
        branch=args.branch,
        fork_release=fork_release,
    )

    cmd = [
        "gh",
        "release",
        "upload",
        args.tag,
        "--repo",
        args.upstream_repo,
    ]
    if getattr(args, "clobber", False):
        cmd.append("--clobber")
    cmd.extend(str(path) for path in asset_files)
    run_command(cmd)
    print(
        f"Uploaded {len(asset_files)} asset(s) from {download_dir} to "
        f"{args.upstream_repo}@{args.tag}.",
        flush=True,
    )


def ensure_upstream_release(
    *,
    upstream_repo: str,
    tag: str,
    branch: str,
    fork_release: dict[str, Any],
) -> dict[str, Any]:
    release = get_release(repo=upstream_repo, tag=tag)
    if release is not None:
        return release

    create_release(
        repo=upstream_repo,
        tag=tag,
        target=branch,
        title=fork_release.get("name") or tag,
        notes=fork_release.get("body") or "",
        prerelease=bool(fork_release.get("isPrerelease")),
    )
    release = get_release(repo=upstream_repo, tag=tag)
    if release is None:
        raise CommandError(
            f"expected upstream release {upstream_repo}@{tag} to exist after creation"
        )
    return release


def create_release(
    *,
    repo: str,
    tag: str,
    target: str,
    title: str,
    notes: str,
    prerelease: bool,
) -> None:
    cmd = [
        "gh",
        "release",
        "create",
        tag,
        "--repo",
        repo,
        "--target",
        target,
        "--title",
        title,
        "--notes",
        notes,
    ]
    if prerelease:
        cmd.append("--prerelease")
    run_command(cmd)


def resolve_download_dir(tag: str, override: str | None) -> Path:
    template = (
        override
        or os.environ.get("PLEXOS2DUCKDB_RELEASE_DOWNLOAD_DIR")
        or DEFAULT_DOWNLOAD_DIR_TEMPLATE
    )
    return Path(template.format(tag=tag)).expanduser()


def get_release(*, repo: str, tag: str) -> dict[str, Any] | None:
    cmd = [
        "gh",
        "release",
        "view",
        tag,
        "--repo",
        repo,
        "--json",
        "assets,body,isDraft,isPrerelease,name,targetCommitish,url",
    ]
    result = run_command(cmd, capture=True, check=False)
    if result.returncode != 0:
        message = (result.stderr or result.stdout).strip().lower()
        if "release not found" in message or "http 404" in message:
            return None
        raise CommandError(command_failure_message(cmd, result))
    return json.loads(result.stdout)


def gh_json(args: list[str]) -> Any:
    result = run_command(["gh", *args], capture=True)
    return json.loads(result.stdout)


def parse_github_timestamp(value: str) -> dt.datetime:
    return dt.datetime.fromisoformat(value.replace("Z", "+00:00"))


def run_command(
    cmd: list[str],
    *,
    capture: bool = False,
    check: bool = True,
    announce: bool = True,
) -> subprocess.CompletedProcess[str]:
    if announce:
        print("+", " ".join(shlex.quote(part) for part in cmd), flush=True)
    result = subprocess.run(
        cmd,
        check=False,
        text=True,
        capture_output=capture,
    )
    if check and result.returncode != 0:
        raise CommandError(command_failure_message(cmd, result))
    return result


def command_failure_message(
    cmd: list[str], result: subprocess.CompletedProcess[str]
) -> str:
    details = combine_output(result)
    command_text = " ".join(shlex.quote(part) for part in cmd)
    if details:
        return f"command failed with exit code {result.returncode}: {command_text}\n{details}"
    return f"command failed with exit code {result.returncode}: {command_text}"


def combine_output(result: subprocess.CompletedProcess[str]) -> str:
    return "\n".join(
        part.strip()
        for part in (result.stderr, result.stdout)
        if part and part.strip()
    )


if __name__ == "__main__":
    raise SystemExit(main())
