❯ cat /mnt/data/maintenance/z-ctk                   (base)
#!/usr/bin/env python3
from __future__ import annotations

import argparse
import json
import os
import pathlib
import re
import shlex
import shutil
import stat
import subprocess
import sys
from dataclasses import dataclass
from string import Template


SCRIPT_DIR = pathlib.Path(__file__).resolve().parent
TEMPLATE_DIR = SCRIPT_DIR / "templates"
NAME_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9_.-]*$")
COMMANDS = {"init", "ls", "rm"}
STATE_DIR_NAME = "z-ctk"


@dataclass(frozen=True)
class ManagedPaths:
    units_dir: pathlib.Path
    helper_dir: pathlib.Path
    metadata: pathlib.Path
    container_unit: pathlib.Path
    sshd_unit: pathlib.Path
    container_helper: pathlib.Path
    sshd_helper: pathlib.Path


def shell_join(parts: list[str]) -> str:
    return " ".join(shlex.quote(part) for part in parts)


def render_template(name: str, values: dict[str, str]) -> str:
    return Template((TEMPLATE_DIR / name).read_text()).substitute(values)


def write_text(path: pathlib.Path, content: str, mode: int | None = None) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content)
    if mode is not None:
        path.chmod(mode)


def unlink_if_exists(path: pathlib.Path) -> None:
    if path.exists():
        path.unlink()


def run(cmd: list[str], check: bool = True, capture: bool = False) -> subprocess.CompletedProcess[str]:
    return subprocess.run(cmd, check=check, text=True, capture_output=capture)


def require_binary(name: str) -> str:
    path = shutil.which(name)
    if not path:
        raise SystemExit(f"missing required binary: {name}")
    return path


def state_root(home: pathlib.Path) -> pathlib.Path:
    xdg_state_home = os.environ.get("XDG_STATE_HOME")
    if xdg_state_home:
        return pathlib.Path(xdg_state_home).expanduser()
    return home / ".local" / "state"


def normalize_argv(argv: list[str]) -> list[str]:
    if not argv:
        return argv
    if argv[0] in COMMANDS or argv[0] in {"-h", "--help"}:
        return argv
    return ["init", *argv]


def split_passthrough(argv: list[str]) -> tuple[list[str], list[str]]:
    if "--" not in argv:
        return argv, []
    index = argv.index("--")
    return argv[:index], argv[index + 1 :]


def validate_passthrough(args: list[str]) -> None:
    banned_exact = {
        "--mount",
        "--volume",
        "--publish",
        "--name",
        "--device",
        "--user",
        "--userns",
        "--restart",
        "--rm",
        "--replace",
        "--entrypoint",
        "--init",
        "--stop-timeout",
    }
    for token in args:
        if token in banned_exact or any(token.startswith(flag + "=") for flag in banned_exact):
            raise SystemExit(f"unsupported passthrough flag: {token}")
        if token.startswith("-p") or token.startswith("-v") or token.startswith("-u"):
            raise SystemExit(f"unsupported passthrough flag: {token}")


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="Manage z-ctk rootless Podman containers and SSH user units.")
    subparsers = parser.add_subparsers(dest="command")
    subparsers.required = True

    init_parser = subparsers.add_parser("init", help="Create and enable a z-ctk container.")
    init_parser.add_argument("--image", required=True)
    init_parser.add_argument("--port", type=int, default=39000)
    key_group = init_parser.add_mutually_exclusive_group(required=True)
    key_group.add_argument(
        "--authorized-keys",
        dest="authorized_keys_text",
        help="Raw authorized_keys text copied into the container user's SSH authorization file.",
    )
    key_group.add_argument(
        "--authorized-keys-file",
        dest="authorized_keys_file",
        help="Path to a host authorized_keys file copied into the container user's SSH authorization file.",
    )
    init_parser.add_argument("--container-name")
    init_parser.add_argument("--container-user", help="SSH login user created inside the container. Defaults to the current host user.")
    init_parser.add_argument(
        "--workspace",
        help="Host directory mounted at the container user's home. Advanced use may pass a full Podman --mount spec.",
    )
    init_parser.add_argument("--no-workspace", action="store_true")
    init_parser.add_argument("--mount", action="append", default=[], help="Additional Podman --mount specs.")
    init_parser.add_argument("--device", action="append", default=[], help="Additional Podman --device values.")

    subparsers.add_parser("ls", help="List z-ctk containers.")

    rm_parser = subparsers.add_parser("rm", help="Remove a z-ctk container and its user units.")
    rm_parser.add_argument("container_name", nargs="?", help="Managed container name. Defaults to <current-user>-dev.")
    rm_parser.add_argument("--all", action="store_true", help="Remove all z-ctk containers.")

    return parser


def parse_args(argv: list[str]) -> tuple[argparse.Namespace, list[str]]:
    argv = normalize_argv(argv)
    ours, passthrough = split_passthrough(argv)
    parser = build_parser()
    args = parser.parse_args(ours)
    if args.command == "init":
        validate_passthrough(passthrough)
    elif passthrough:
        raise SystemExit(f"unexpected passthrough args for {args.command}: {' '.join(passthrough)}")
    return args, passthrough


def parse_mount_spec(spec: str) -> dict[str, str]:
    values: dict[str, str] = {}
    for part in spec.split(","):
        item = part.strip()
        if not item or "=" not in item:
            continue
        key, value = item.split("=", 1)
        values[key.strip()] = value.strip()
    return values


def build_workspace_mount(workspace: str | None, no_workspace: bool, container_user: str) -> tuple[str | None, str]:
    default_home = f"/home/{container_user}"
    if no_workspace:
        return None, default_home
    if workspace is None:
        workspace_dir = pathlib.Path.home() / "dev-container"
        workspace_dir.mkdir(parents=True, exist_ok=True)
        return f"type=bind,src={workspace_dir},target={default_home},rw", default_home

    fields = parse_mount_spec(workspace)
    if {"type", "src", "source", "target", "destination", "dst"} & fields.keys():
        target = fields.get("target") or fields.get("destination") or fields.get("dst")
        if not target:
            raise SystemExit("--workspace mount spec must include target=...")
        return workspace, target

    workspace_dir = pathlib.Path(workspace).expanduser().resolve()
    workspace_dir.mkdir(parents=True, exist_ok=True)
    return f"type=bind,src={workspace_dir},target={default_home},rw", default_home


def managed_paths(home: pathlib.Path, container_name: str) -> ManagedPaths:
    units_dir = home / ".config" / "systemd" / "user"
    helper_dir = state_root(home) / STATE_DIR_NAME
    return ManagedPaths(
        units_dir=units_dir,
        helper_dir=helper_dir,
        metadata=helper_dir / f"{container_name}.json",
        container_unit=units_dir / f"{container_name}.service",
        sshd_unit=units_dir / f"{container_name}-sshd.service",
        container_helper=helper_dir / f"{container_name}-container.sh",
        sshd_helper=helper_dir / f"{container_name}-sshd.sh",
    )


def render_container_helper(
    podman: str,
    name: str,
    image: str,
    port: int,
    mounts: list[str],
    devices: list[str],
    extra: list[str],
) -> str:
    create_cmd = [podman, "create", "--name", name, "--userns", "keep-id", "--init", "--stop-timeout", "5"]
    for device in devices:
        create_cmd.extend(["--device", device])
    for mount in mounts:
        create_cmd.extend(["--mount", mount])
    create_cmd.extend(["--publish", f"127.0.0.1:{port}:22"])
    create_cmd.extend(extra)
    create_cmd.extend([image, "sleep", "infinity"])
    return f"""#!/bin/sh
set -eu

podman={shlex.quote(podman)}
container_name={shlex.quote(name)}

create() {{
    if "$podman" container exists "$container_name"; then
        exit 0
    fi
    exec {shell_join(create_cmd)}
}}

start() {{
    running=$("$podman" inspect -f '{{{{.State.Running}}}}' "$container_name" 2>/dev/null || printf 'false\\n')
    if [ "$running" = "true" ]; then
        exec "$podman" attach "$container_name"
    fi
    exec "$podman" start --attach "$container_name"
}}

stop() {{
    exec "$podman" stop --ignore -t 10 "$container_name"
}}

case "${{1:-}}" in
    create) create ;;
    start) start ;;
    stop) stop ;;
    *) echo "usage: $0 {{create|start|stop}}" >&2; exit 2 ;;
esac
"""


def render_sshd_helper(
    podman: str,
    name: str,
    container_user: str,
    container_uid: int,
    container_gid: int,
    container_home: str,
    authorized_keys_file: pathlib.Path | None,
    authorized_keys_text: str | None,
) -> str:
    authorized_keys_path = f"/etc/ssh/authorized_keys/{container_user}"
    sudoers_path = f"/etc/sudoers.d/90-{container_user}"
    lines = [
        "#!/bin/sh",
        "set -eu",
        "",
        f"podman={shlex.quote(podman)}",
        f"container_name={shlex.quote(name)}",
        f"container_user={shlex.quote(container_user)}",
        f"container_uid={container_uid}",
        f"container_gid={container_gid}",
        f"container_home={shlex.quote(container_home)}",
    ]
    if authorized_keys_file is not None:
        lines.append(f"authorized_keys_file={shlex.quote(str(authorized_keys_file))}")
        copy_authorized_keys = (
            f"cat \"$authorized_keys_file\" | "
            f"\"$podman\" exec --user root -i \"$container_name\" /bin/sh -lc 'cat >{authorized_keys_path}'"
        )
    else:
        lines.append(f"authorized_keys_text={shlex.quote(authorized_keys_text or '')}")
        copy_authorized_keys = (
            "printf '%s\\n' \"$authorized_keys_text\" | "
            f"\"$podman\" exec --user root -i \"$container_name\" /bin/sh -lc 'cat >{authorized_keys_path}'"
        )
    lines.extend(
        [
            "",
            "stop_sshd() {",
            "    \"$podman\" exec --user root \"$container_name\" /bin/sh -lc 'if [ -f /run/sshd.pid ]; then kill \"$(cat /run/sshd.pid)\"; fi' >/dev/null 2>&1 || true",
            "}",
            "",
            "bootstrap() {",
            "    \"$podman\" exec --user root \"$container_name\" /bin/sh -lc 'command -v apt-get >/dev/null 2>&1' >/dev/null 2>&1 || {",
            "        echo \"apt-get is required inside $container_name\" >&2",
            "        exit 1",
            "    }",
            "    packages=''",
            "    if ! \"$podman\" exec --user root \"$container_name\" /bin/sh -lc 'test -x /usr/sbin/sshd'; then",
            "        packages=\"$packages openssh-server\"",
            "    fi",
            "    if ! \"$podman\" exec --user root \"$container_name\" /bin/sh -lc 'command -v sudo >/dev/null 2>&1'; then",
            "        packages=\"$packages sudo\"",
            "    fi",
            "    if [ -n \"$packages\" ]; then",
            "        \"$podman\" exec --user root \"$container_name\" /bin/sh -lc \"export DEBIAN_FRONTEND=noninteractive; apt-get update && apt-get install -y --no-install-recommends $packages\"",
            "    fi",
            "    cat <<'EOF' | \"$podman\" exec --user root -i \"$container_name\" /bin/sh",
            "set -eu",
            f"container_user={shlex.quote(container_user)}",
            f"container_uid={container_uid}",
            f"container_gid={container_gid}",
            f"container_home={shlex.quote(container_home)}",
            "existing_group=$(getent group \"$container_gid\" | cut -d: -f1 || true)",
            "if [ -n \"$existing_group\" ]; then",
                "    group_name=\"$existing_group\"",
            "elif getent group \"$container_user\" >/dev/null 2>&1; then",
            "    echo \"group $container_user already exists with a different gid\" >&2",
            "    exit 1",
            "else",
            "    groupadd -g \"$container_gid\" \"$container_user\"",
            "    group_name=\"$container_user\"",
            "fi",
            "uid_owner=$(getent passwd \"$container_uid\" | cut -d: -f1 || true)",
            "if [ -n \"$uid_owner\" ] && [ \"$uid_owner\" != \"$container_user\" ]; then",
            "    echo \"uid $container_uid already belongs to $uid_owner\" >&2",
            "    exit 1",
            "fi",
            "mkdir -p \"$container_home\" /run/sshd /etc/ssh/authorized_keys /etc/ssh/sshd_config.d /etc/sudoers.d",
            "if id -u \"$container_user\" >/dev/null 2>&1; then",
            "    if [ \"$(id -u \"$container_user\")\" != \"$container_uid\" ]; then",
            "        echo \"user $container_user already exists with a different uid\" >&2",
            "        exit 1",
            "    fi",
            "    usermod -d \"$container_home\" -g \"$container_gid\" -s /bin/bash \"$container_user\"",
            "else",
            "    useradd -M -d \"$container_home\" -s /bin/bash -u \"$container_uid\" -g \"$container_gid\" \"$container_user\"",
            "fi",
            "chown \"$container_uid:$container_gid\" \"$container_home\" >/dev/null 2>&1 || true",
            "chmod 755 /etc/ssh/authorized_keys",
            f"printf '%s ALL=(ALL) NOPASSWD:ALL\\n' \"$container_user\" >{sudoers_path}",
            f"chmod 440 {sudoers_path}",
            "EOF",
            f"    {copy_authorized_keys}",
            f"    \"$podman\" exec --user root \"$container_name\" /bin/sh -lc 'chmod 600 {authorized_keys_path} && chown root:root {authorized_keys_path} && ssh-keygen -A'",
            "    cat <<'EOF' | \"$podman\" exec --user root -i \"$container_name\" /bin/sh -lc 'cat >/etc/ssh/sshd_config.d/10-rootless-dev.conf'",
            "PermitRootLogin no",
            "PasswordAuthentication no",
            "KbdInteractiveAuthentication no",
            "ChallengeResponseAuthentication no",
            "PubkeyAuthentication yes",
            "AuthorizedKeysFile /etc/ssh/authorized_keys/%u",
            f"AllowUsers {container_user}",
            "PidFile /run/sshd.pid",
            "EOF",
            "    \"$podman\" exec --user root \"$container_name\" /usr/sbin/sshd -t",
            "}",
            "",
            "start() {",
            "    stop_sshd",
            "    exec \"$podman\" exec --user root \"$container_name\" /usr/sbin/sshd -D -e -o PidFile=/run/sshd.pid",
            "}",
            "",
            "stop() {",
            "    stop_sshd",
            "}",
            "",
            "case \"${1:-}\" in",
            "    bootstrap) bootstrap ;;",
            "    start) start ;;",
            "    stop) stop ;;",
            "    *) echo \"usage: $0 {bootstrap|start|stop}\" >&2; exit 2 ;;",
            "esac",
        ]
    )
    return "\n".join(lines) + "\n"


def linger_enabled(loginctl: str, user: str) -> bool | None:
    result = run([loginctl, "show-user", user, "-p", "Linger"], check=False, capture=True)
    if result.returncode != 0:
        return None
    return result.stdout.strip().endswith("yes")


def container_exists(podman: str, container_name: str) -> bool:
    return run([podman, "container", "exists", container_name], check=False).returncode == 0


def container_status(podman: str, container_name: str) -> str:
    result = run([podman, "inspect", "-f", "{{.State.Status}}", container_name], check=False, capture=True)
    if result.returncode != 0:
        return "missing"
    return result.stdout.strip() or "unknown"


def unit_status(systemctl: str, unit_path: pathlib.Path) -> str:
    if not unit_path.exists():
        return "missing"
    result = run([systemctl, "--user", "is-active", unit_path.name], check=False, capture=True)
    if result.returncode == 0:
        return result.stdout.strip() or "active"
    stdout = result.stdout.strip()
    return stdout or "inactive"


def read_metadata(path: pathlib.Path) -> dict[str, object]:
    if not path.exists():
        return {}
    try:
        return json.loads(path.read_text())
    except json.JSONDecodeError:
        return {"legacy": True}


def list_installation_names(home: pathlib.Path) -> list[str]:
    names: set[str] = set()
    helper_dir = state_root(home) / STATE_DIR_NAME
    if helper_dir.exists():
        for path in helper_dir.glob("*.json"):
            names.add(path.stem)
        for path in helper_dir.glob("*-container.sh"):
            names.add(path.name[: -len("-container.sh")])
    return sorted(name for name in names if name)


def metadata_summary(metadata: dict[str, object], key: str, default: str = "-") -> str:
    value = metadata.get(key, default)
    return str(value)


def command_init(args: argparse.Namespace, passthrough: list[str]) -> int:
    if os.geteuid() == 0:
        raise SystemExit("run this as the target non-root user")

    user = os.environ.get("USER") or pathlib.Path.home().name
    container_user = args.container_user or user
    container_name = args.container_name or f"{user}-dev"

    if not NAME_RE.match(container_user):
        raise SystemExit(f"invalid container user: {container_user}")
    if not NAME_RE.match(container_name):
        raise SystemExit(f"invalid container name: {container_name}")
    if args.port < 1 or args.port > 65535:
        raise SystemExit(f"invalid port: {args.port}")
    if args.no_workspace and args.workspace:
        raise SystemExit("--workspace and --no-workspace may not be used together")

    authorized_keys_file: pathlib.Path | None = None
    authorized_keys_text: str | None = None
    authorized_keys_source = "inline"
    if args.authorized_keys_file is not None:
        authorized_keys_file = pathlib.Path(args.authorized_keys_file).expanduser().resolve()
        if not authorized_keys_file.is_file():
            raise SystemExit(f"authorized_keys file not found: {authorized_keys_file}")
        if authorized_keys_file.stat().st_size == 0:
            raise SystemExit(f"authorized_keys file is empty: {authorized_keys_file}")
        authorized_keys_source = "file"
    else:
        authorized_keys_text = args.authorized_keys_text
        if authorized_keys_text is None or not authorized_keys_text.strip():
            raise SystemExit("authorized_keys text is empty")

    workspace_mount, container_home = build_workspace_mount(args.workspace, args.no_workspace, container_user)
    mounts = list(args.mount)
    if workspace_mount is not None:
        mounts.insert(0, workspace_mount)

    podman = require_binary("podman")
    systemctl = require_binary("systemctl")
    loginctl = require_binary("loginctl")

    if container_exists(podman, container_name):
        raise SystemExit(f"container already exists: {container_name}")

    home = pathlib.Path.home()
    paths = managed_paths(home, container_name)
    if any(path.exists() for path in [paths.container_unit, paths.sshd_unit, paths.container_helper, paths.sshd_helper, paths.metadata]):
        raise SystemExit(f"managed files already exist for {container_name}")

    write_text(
        paths.container_helper,
        render_container_helper(podman, container_name, args.image, args.port, mounts, args.device, passthrough),
        stat.S_IRWXU,
    )
    write_text(
        paths.sshd_helper,
        render_sshd_helper(
            podman,
            container_name,
            container_user,
            os.getuid(),
            os.getgid(),
            container_home,
            authorized_keys_file,
            authorized_keys_text,
        ),
        stat.S_IRWXU,
    )
    write_text(
        paths.container_unit,
        render_template("container.service.in", {"container_name": container_name, "container_helper": str(paths.container_helper)}),
    )
    write_text(
        paths.sshd_unit,
        render_template("sshd.service.in", {"container_name": container_name, "container_unit": paths.container_unit.name, "sshd_helper": str(paths.sshd_helper)}),
    )

    metadata = {
        "container_name": container_name,
        "container_user": container_user,
        "image": args.image,
        "port": args.port,
        "container_home": container_home,
        "workspace_mount": workspace_mount or "",
        "authorized_keys_source": authorized_keys_source,
    }
    write_text(paths.metadata, json.dumps(metadata, indent=2, sort_keys=True) + "\n")

    run([systemctl, "--user", "daemon-reload"])
    run([systemctl, "--user", "enable", "--now", paths.container_unit.name])
    run([systemctl, "--user", "enable", "--now", paths.sshd_unit.name])

    print(f"installed {paths.container_unit.name} and {paths.sshd_unit.name}")
    print(f"ssh login user: {container_user}")
    print(f"workspace home: {container_home}")
    linger = linger_enabled(loginctl, user)
    if linger is False:
        print(f"linger is disabled; run: sudo loginctl enable-linger {user}", file=sys.stderr)
    elif linger is None:
        print("could not determine linger state; check it manually if you need boot persistence", file=sys.stderr)
    return 0


def command_ls() -> int:
    home = pathlib.Path.home()
    names = list_installation_names(home)
    if not names:
        print("no z-ctk containers")
        return 0

    podman = require_binary("podman")
    systemctl = require_binary("systemctl")
    for name in names:
        paths = managed_paths(home, name)
        metadata = read_metadata(paths.metadata)
        print(
            " ".join(
                [
                    name,
                    f"user={metadata_summary(metadata, 'container_user')}",
                    f"podman={container_status(podman, name)}",
                    f"container_unit={unit_status(systemctl, paths.container_unit)}",
                    f"sshd_unit={unit_status(systemctl, paths.sshd_unit)}",
                    f"home={metadata_summary(metadata, 'container_home')}",
                    f"port={metadata_summary(metadata, 'port')}",
                    f"image={metadata_summary(metadata, 'image')}",
                ]
            )
        )
    return 0


def cleanup_empty_dirs(paths: ManagedPaths) -> None:
    if paths.helper_dir.exists() and not any(paths.helper_dir.iterdir()):
        paths.helper_dir.rmdir()


def command_rm(args: argparse.Namespace) -> int:
    if os.geteuid() == 0:
        raise SystemExit("run this as the target non-root user")

    user = os.environ.get("USER") or pathlib.Path.home().name
    if args.all and args.container_name:
        raise SystemExit("rm accepts either a container name or --all")

    home = pathlib.Path.home()
    if args.all:
        names = list_installation_names(home)
        if not names:
            print("no z-ctk containers")
            return 0
    else:
        names = [args.container_name or f"{user}-dev"]

    podman = require_binary("podman")
    systemctl = require_binary("systemctl")
    removed: list[str] = []
    for name in names:
        paths = managed_paths(home, name)
        exists = any(
            path.exists()
            for path in [paths.metadata, paths.container_helper, paths.sshd_helper, paths.container_unit, paths.sshd_unit]
        ) or container_exists(podman, name)
        if not exists:
            if args.all:
                continue
            raise SystemExit(f"z-ctk container not found: {name}")

        run([systemctl, "--user", "disable", "--now", paths.sshd_unit.name], check=False, capture=True)
        run([systemctl, "--user", "disable", "--now", paths.container_unit.name], check=False, capture=True)
        run([podman, "rm", "-f", "--ignore", name], check=False, capture=True)

        unlink_if_exists(paths.container_unit)
        unlink_if_exists(paths.sshd_unit)
        unlink_if_exists(paths.container_helper)
        unlink_if_exists(paths.sshd_helper)
        unlink_if_exists(paths.metadata)
        cleanup_empty_dirs(paths)
        removed.append(name)

    run([systemctl, "--user", "daemon-reload"], check=False)
    for name in removed:
        print(f"removed {name}")
    return 0


def main() -> int:
    args, passthrough = parse_args(sys.argv[1:])
    if args.command == "init":
        return command_init(args, passthrough)
    if args.command == "ls":
        return command_ls()
    if args.command == "rm":
        return command_rm(args)
    raise SystemExit(f"unsupported command: {args.command}")


if __name__ == "__main__":
    raise SystemExit(main())

