# Repository context export

====================================================================================================
FILE: aps_automation_sdk/__init__.py
====================================================================================================


from .classes import (
    Activity,
    ActivityParameter,
    ActivityInputParameter,
    ActivityOutputParameter,
    ActivityJsonParameter,
    AppBundle,
    WorkItem,
    ActivityInputParameterAcc,
    ActivityOutputParameterAcc,
    UploadActivityInputParameter,
    WorkItemAcc
)
from .core import (
    WorkItemPollEvent,
    PollCallback,
)

from .utils import (
    set_nickname,
    get_token,
    get_nickname,
    get_forgeapp_profile,
    upload_public_key,
    delete_activity,
    delete_appbundle,
    create_bucket,
    create_appbundle_version,
    move_or_create_alias,
    publish_appbundle_update
)

from .model_derivative import (
    safe_base64_encode,
    to_md_urn,
    get_revit_version_from_manifest,
    fetch_manifest,
    get_revit_version_from_oss_object,
    start_svf_translation_job,
    get_translation_status,
    translate_file_in_oss,
    get_translation_info,
)

from .signing import (
    generate_key_file,
    export_public_key,
    sign_activity,
)
from .ssa import (
    DEFAULT_SSA_SCOPES,
    SsaConfig,
    build_ssa_jwt,
    exchange_jwt_assertion_for_token,
    get_ssa_3lo_token,
    parse_token_response,
)

__all__ = [
    "Activity",
    "ActivityParameter",
    "ActivityInputParameter",
    "ActivityOutputParameter",
    "ActivityJsonParameter",
    "AppBundle",
    "WorkItem",
    "ActivityInputParameterAcc",
    "ActivityOutputParameterAcc",
    "UploadActivityInputParameter",
    "WorkItemAcc",
    "WorkItemPollEvent",
    "PollCallback",
    "get_token",
    "get_nickname",
    "get_forgeapp_profile",
    "upload_public_key",
    "delete_activity",
    "delete_appbundle",
    "create_bucket",
    "create_appbundle_version",
    "move_or_create_alias",
    "publish_appbundle_update",
    "set_nickname",
    "generate_key_file",
    "export_public_key",
    "sign_activity",
    "DEFAULT_SSA_SCOPES",
    "SsaConfig",
    "build_ssa_jwt",
    "parse_token_response",
    "exchange_jwt_assertion_for_token",
    "get_ssa_3lo_token",
    "safe_base64_encode",
    "to_md_urn",
    "get_revit_version_from_manifest",
    "fetch_manifest",
    "get_revit_version_from_oss_object",
    "start_svf_translation_job",
    "get_translation_status",
    "translate_file_in_oss",
    "get_translation_info",
]

__version__ = "0.1.0"

====================================================================================================
FILE: aps_automation_sdk/acc.py
====================================================================================================

import requests
import urllib.parse
import json
from typing import Any

APS_BASE_URL = "https://developer.api.autodesk.com"
PROJECTS_V1 = f"{APS_BASE_URL}/project/v1"
DATA_V1 = f"{APS_BASE_URL}/data/v1"
DATA_V2 = f"{APS_BASE_URL}/data/v2"
DA_V3 = f"{APS_BASE_URL}/da/us-east/v3"


def bearer(token: str) -> dict[str, str]:
    return {"Authorization": f"Bearer {token}"}


def item_from_version(project_id: str, version_urn: str, token: str) -> str:
    """Get item ID from version URN."""
    if "?version=" not in version_urn:
        raise RuntimeError("version URN must include '?version=N'")
    url = f"{DATA_V1}/projects/{project_id}/versions/{urllib.parse.quote(version_urn, safe='')}/item"
    r = requests.get(url, headers=bearer(token), timeout=30)
    r.raise_for_status()
    data = r.json().get("data", {})
    if data.get("type") != "items" or "id" not in data:
        raise RuntimeError(
            f"Unexpected payload for versions->item, {json.dumps(r.json())[:400]}"
        )
    return data["id"]


def parent_folder_from_item(project_id: str, item_id: str, token: str) -> str:
    """Get parent folder ID from item ID."""
    url = f"{DATA_V1}/projects/{project_id}/items/{urllib.parse.quote(item_id, safe='')}/parent"
    r = requests.get(url, headers=bearer(token), timeout=30)
    r.raise_for_status()
    data = r.json().get("data", {})
    if data.get("type") != "folders" or "id" not in data:
        raise RuntimeError(
            f"Unexpected payload for item->parent, {json.dumps(r.json())[:400]}"
        )
    return data["id"]


def resolve_parent_folder(project_id: str, any_urn: str, token: str) -> str:
    """Resolve parent folder ID from any version URN."""
    item_id = item_from_version(project_id, any_urn, token)
    folder_id = parent_folder_from_item(project_id, item_id, token)
    return folder_id


def get_item_tip_version(
    project_id: str, item_lineage_urn: str, token: str
) -> dict[str, Any]:
    url = f"{DATA_V1}/projects/{project_id}/items/{urllib.parse.quote(item_lineage_urn, safe=':')}/tip"
    r = requests.get(url, headers=bearer(token), timeout=30)
    r.raise_for_status()
    return r.json()


def find_tip_storage_id(tip_payload: dict[str, Any]) -> str:
    """
    Return the storage objectId for the tip version.
    """
    nodes = [tip_payload.get("data", {})] + tip_payload.get("included", [])
    for node in nodes:
        rel = node.get("relationships", {})
        st = rel.get("storage") or {}
        data_rel = st.get("data") or {}
        sid = data_rel.get("id")
        if sid:
            return sid
    raise RuntimeError("No storage id found in tip payload")


def create_storage(project_id: str, folder_urn: str, file_name: str, token: str) -> str:
    """POST /data/v1/projects/{project_id}/storage, returns storage objectId"""
    url = f"{DATA_V1}/projects/{project_id}/storage"
    payload = {
        "jsonapi": {"version": "1.0"},
        "data": {
            "type": "objects",
            "attributes": {"name": file_name},
            "relationships": {
                "target": {"data": {"type": "folders", "id": folder_urn}}
            },
        },
    }
    r = requests.post(
        url,
        headers={**bearer(token), "Content-Type": "application/vnd.api+json"},
        json=payload,
        timeout=30,
    )
    r.raise_for_status()
    storage_id = r.json().get("data", {}).get("id")
    if not storage_id:
        raise RuntimeError("Storage creation returned no id")
    return storage_id


def to_data_url_json(obj: dict) -> str:
    """Convert a dict to a minified JSON data URL for inline payload."""
    return "data:application/json," + urllib.parse.quote(
        json.dumps(obj, separators=(",", ":"))
    )


def find_item_by_name(
    project_id: str, folder_urn: str, file_name: str, token: str
) -> str | None:
    url = f"{DATA_V1}/projects/{project_id}/folders/{folder_urn}/contents"
    r = requests.get(url, headers=bearer(token), timeout=30)
    r.raise_for_status()
    for entry in r.json().get("data", []):
        if (
            entry.get("type") == "items"
            and entry.get("attributes", {}).get("displayName") == file_name
        ):
            return entry.get("id")
    return None


def create_version_for_item(
    project_id: str, item_id: str, file_name: str, storage_id: str, token: str
) -> dict[str, Any]:
    url = f"{DATA_V1}/projects/{project_id}/versions"
    payload = {
        "jsonapi": {"version": "1.0"},
        "data": {
            "type": "versions",
            "attributes": {
                "name": file_name,
                "extension": {
                    "type": "versions:autodesk.bim360:File",
                    "version": "1.0",
                },
            },
            "relationships": {
                "item": {"data": {"type": "items", "id": item_id}},
                "storage": {"data": {"type": "objects", "id": storage_id}},
            },
        },
    }
    r = requests.post(
        url,
        headers={**bearer(token), "Content-Type": "application/vnd.api+json"},
        json=payload,
        timeout=60,
    )
    r.raise_for_status()
    return r.json()


def create_item_with_first_version(
    project_id: str,
    folder_urn: str,
    file_name: str,
    storage_id: str,
    token: str,
    version: int = 1,
) -> dict[str, Any]:
    """
    POST /data/v1/projects/{project_id}/items to create a new Item and first Version
    referencing the storage we prepared and Automation wrote to.
    """
    url = f"{DATA_V1}/projects/{project_id}/items"
    payload = {
        "jsonapi": {"version": "1.0"},
        "data": {
            "type": "items",
            "attributes": {
                "displayName": file_name,
                "extension": {"type": "items:autodesk.bim360:File", "version": "1.0"},
            },
            "relationships": {
                "tip": {"data": {"type": "versions", "id": str(version)}},
                "parent": {"data": {"type": "folders", "id": folder_urn}},
            },
        },
        "included": [
            {
                "type": "versions",
                "id": "1",
                "attributes": {
                    "name": file_name,
                    "extension": {
                        "type": "versions:autodesk.bim360:File",
                        "version": "1.0",
                    },
                },
                "relationships": {
                    "storage": {"data": {"type": "objects", "id": storage_id}}
                },
            }
        ],
    }
    r = requests.post(
        url,
        headers={**bearer(token), "Content-Type": "application/vnd.api+json"},
        json=payload,
        timeout=60,
    )
    r.raise_for_status()
    return r.json()

====================================================================================================
FILE: aps_automation_sdk/classes.py
====================================================================================================

import json
from typing import Literal, Any, Optional
from pydantic import BaseModel, Field, PrivateAttr
from .core import (
    get_signed_s3_upload,
    put_to_signed_url,
    complete_signed_s3_upload,
    build_oss_urn,
    get_signed_s3_download,
    dowload_from_signed_url,
    create_activity,
    create_activity_alias,
    upload_appbundle,
    create_appbundle_alias,
    register_appbundle,
    run_work_item,
    PollCallback,
    poll_workitem_status,
    run_public_work_item,
)
from .utils import create_bucket
from .dsl import RegisterBundleResponse, UploadParameters
from aps_automation_sdk.acc import get_item_tip_version, find_tip_storage_id, create_storage, create_item_with_first_version, create_version_for_item, find_item_by_name

class ActivityParameter(BaseModel):
    name: str
    localName: str
    verb: Literal["get", "put", "post"]
    description: str
    zip: bool = Field(default=False)
    ondemand: bool = Field(default=False)
    required: bool = Field(default=False)

    # storage, optional for JSON params
    bucketKey: Optional[str] = None
    objectKey: Optional[str] = None

    # roles
    is_output: bool = False
    is_engine_input: bool = False

    def oss_keys(self) -> tuple[str, str]:
        if not self.bucketKey or not self.objectKey:
            raise ValueError(f"{self.name}: bucketKey and objectKey are required for OSS operations")
        return self.bucketKey, self.objectKey

    def ensure_bucket(self, token: str) -> None:
        try:
            create_bucket(bucketKey=self.bucketKey, token=token)
        except Exception:
            pass

    def upload_file_to_oss(self, file_path: str, token: str) -> None:
        bucketKey, objectKey = self.oss_keys()
        self.ensure_bucket(token)
        signed = get_signed_s3_upload(bucketKey=bucketKey, objectKey=objectKey, token=token)
        put_to_signed_url(signed_url=signed.urls[0], file_path=file_path)
        complete_signed_s3_upload(bucketKey=bucketKey, objectKey=objectKey, uploadKey=signed.uploadKey, token=token)

    def download_to(self, output_path: str, token: str) -> None:
        if not self.is_output:
            raise ValueError(f"{self.name}: download_to is only valid for output parameters")
        bucketKey, objectKey = self.oss_keys()
        signed = get_signed_s3_download(bucketKey=bucketKey, objectKey=objectKey, token=token)
        dowload_from_signed_url(signed_url=signed["url"], output_path=output_path)

    def generate_oss_urn(self) -> str:
        bucketKey, objectKey = self.oss_keys()
        return build_oss_urn(bucketKey=bucketKey, objectKey=objectKey)

    def to_api_param(self) -> dict[str, Any]:
        return {
            "localName": self.localName,
            "zip": self.zip,
            "ondemand": self.ondemand,
            "verb": self.verb,
            "description": self.description,
            "required": self.required,
        }


class ActivityInputParameter(ActivityParameter):
    is_output: bool = False

    def work_item_arg(self, token: str) -> dict[str, Any]:
        return {
            self.name: {
                "url": self.generate_oss_urn(),
                "verb": self.verb,
                "headers": {"Authorization": f"Bearer {token}"},
            }
        }


class ActivityOutputParameter(ActivityParameter):
    is_output: bool = True

    def work_item_arg(self, _token: str) -> dict[str, Any]:
        return {
            self.name: {
                "url": self.generate_oss_urn(),
                "verb": self.verb,
                "headers": {"Authorization": f"Bearer {_token}"},
            }
        }


class ActivityJsonParameter(ActivityParameter):
    content: dict | None = None

    def work_item_arg(self) -> dict[str, Any]:
        data_str = json.dumps(self.content, separators=(",", ":"))
        return {self.name: {"url": f"data:application/json,{data_str}"}}
    
    def set_content(self, data: dict) -> None:
        self.content = data


class Activity(BaseModel):
    id: str
    parameters: list[ActivityParameter]
    engine: Optional[str] = None
    appbundle_full_name: str
    description: str
    alias: str
    commandLine: Optional[list[str]] = None
    script: str | None = None

    def param_map(self) -> dict[str, dict[str, Any]]:
        return {p.name: p.to_api_param() for p in self.parameters}

    @staticmethod
    def short_appbundle_id(appbundle_full_alias: str) -> str:
        right = appbundle_full_alias.split(".", 1)[-1]
        return right.split("+", 1)[0]

    def set_revit_command_line(self) -> None:
        revit_input = next((p for p in self.parameters if isinstance(p, ActivityInputParameter) and p.is_engine_input), None)
        if revit_input is None:
            raise ValueError("No Revit input parameter marked as engine input")
        appbundle_short_id = self.short_appbundle_id(self.appbundle_full_name)
        self.commandLine = [
            "$(engine.path)\\revitcoreconsole.exe "
            f'/i "$(args[{revit_input.name}].path)" '
            f'/al "$(appbundles[{appbundle_short_id}].path)"'
        ]


    def set_autocad_command_line(self) -> None:
        autocad_input = next((p for p in self.parameters if isinstance(p, ActivityInputParameter) and p.is_engine_input), None)
        if autocad_input is None:
            raise ValueError("No AutoCAD input parameter marked as engine input")
        appbundle_short_id = self.short_appbundle_id(self.appbundle_full_name)
        
        cmd = (
            "$(engine.path)\\accoreconsole.exe "
            f'/i "$(args[{autocad_input.name}].path)" '
            f'/al "$(appbundles[{appbundle_short_id}].path)"'
        )
        
        if self.script:
            cmd += ' /s "$(settings[script].path)"'
        
        self.commandLine = [cmd]

    def to_api_dict(self) -> dict[str, Any]:
        activity_dict =  {
            "id": self.id,
            "commandLine": self.commandLine,
            "parameters": self.param_map(),
            "engine": self.engine,
            "appbundles": [self.appbundle_full_name],
            "description": self.description,
        }
        if self.script:
            activity_dict["settings"] = {"script": self.script}
        return activity_dict

    def deploy(self, token: str) -> None:
        create_activity(token=token, payload=self.to_api_dict())
        create_activity_alias(activity_id=self.id, alias_id=self.alias, version=1, token=token)

class AppBundle(BaseModel):
    appBundleId: str
    engine: str
    alias: str
    zip_path: str
    description: str
    version: int = 0  # updated on deploy

    def register(self, token: str) -> RegisterBundleResponse:
        return register_appbundle(
            appBundleId=self.appBundleId,
            engine=self.engine,
            description=self.description,
            token=token,
        )

    def upload(self, uploadParameters: UploadParameters) -> int:
        return upload_appbundle(
            upload_parameters=uploadParameters,
            zip_path=self.zip_path,
        )

    def create_alias(self, token: str) -> dict:
        return create_appbundle_alias(
            app_id=self.appBundleId,
            alias_id=self.alias,
            version=self.version,
            token=token,
        )

    def deploy(self, token: str) -> int:
        reg = self.register(token)
        self.upload(reg.uploadParameters)
        self.version = int(reg.version)
        self.create_alias(token)
        return self.version

class WorkItem(BaseModel):
    parameters: list[ActivityParameter]
    activity_full_alias: str
    
    def build_arguments(self, token: str) -> dict[str, Any]:
        """Build work item arguments from all parameters."""
        payload = {}
        for param in self.parameters:
            if isinstance(param, ActivityJsonParameter):
                payload |= param.work_item_arg()
            else:
                payload |= param.work_item_arg(token)
        return payload
    
    def run(self, token: str) -> str:
        args = self.build_arguments(token)
        response = run_work_item(
            token=token,
            full_activity_alias=self.activity_full_alias,
            work_item_args=args
        )
        work_item_id = response.get("id")
        if not work_item_id:
            raise RuntimeError("No work item id returned from run_work_item")
        return work_item_id
    
    def poll(
        self,
        work_item_id: str,
        token: str,
        max_wait: int = 600,
        interval: int = 10,
        on_event: PollCallback | None = None,
    ) -> dict[str, Any]:
        return poll_workitem_status(
            work_item_id,
            token,
            max_wait=max_wait,
            interval=interval,
            on_event=on_event,
        )
    
    def execute(
        self,
        token: str,
        max_wait: int = 600,
        interval: int = 10,
        on_event: PollCallback | None = None,
    ) -> dict[str, Any]:
        work_item_id = self.run(token)
        return self.poll(
            work_item_id,
            token,
            max_wait=max_wait,
            interval=interval,
            on_event=on_event,
        )
    

class ActivityInputParameterAcc(ActivityInputParameter):
    linage_urn: str | None = None
    project_id: str | None = None
        
    def get_acc_storage_url(self, token: str) -> str:
        tip_payload = get_item_tip_version(
            project_id= self.project_id,
            item_lineage_urn= self.linage_urn,
            token=token
        )
        acc_storage_url = find_tip_storage_id(tip_payload)
        return acc_storage_url

    def work_item_arg_3lo(self, token_3lo: str) -> dict[str, Any]:
        acc_storage_url = self.get_acc_storage_url(token=token_3lo)
        return {
            self.name: {
                "url": acc_storage_url,
                "verb": self.verb,
                "headers": {"Authorization": f"Bearer {token_3lo}"},
            }
        }

class UploadActivityInputParameter(ActivityInputParameter):
    folder_id: str
    project_id: str
    file_name: str
    file_path: str

    def upload_and_create(self, token: str) -> tuple[str, str]:

        item_id = find_item_by_name(self.project_id, self.folder_id, self.file_name, token)
        # Always create fresh storage for the incoming bytes
        storage_id = create_storage(project_id=self.project_id, folder_urn=self.folder_id, file_name=self.file_name, token=token)
        bucket_key, object_key = storage_id.split("urn:adsk.objects:os.object:")[1].split("/", 1)
        signed = get_signed_s3_upload(bucketKey=bucket_key, objectKey=object_key, token=token)
        print("**"*20)
        print(f"{signed=}")
        put_to_signed_url(signed_url=signed.urls[0], file_path=self.file_path)
        complete_signed_s3_upload(bucketKey=bucket_key, objectKey=object_key, uploadKey=signed.uploadKey, token=token)

        if item_id:
            # Create a new version on the existing item
            _ = create_version_for_item(
                project_id=self.project_id,
                item_id=item_id,
                file_name=self.file_name,
                storage_id=storage_id,
                token=token,
            )
            lineage_urn = item_id
            acc_storage_url = storage_id
            return acc_storage_url, lineage_urn

        # First version on a brand new item
        resp = create_item_with_first_version(
            project_id=self.project_id,
            folder_urn=self.folder_id,
            file_name=self.file_name,
            storage_id=storage_id,
            token=token,
        )
        lineage_urn = resp["data"]["id"]
        acc_storage_url = storage_id
        return acc_storage_url, lineage_urn

    def work_item_arg_3lo(self, token_3lo: str) -> dict[str, Any]:
        acc_storage_url, lineage_urn = self.upload_and_create(token_3lo)
        return {
            self.name: {
                "url": acc_storage_url,
                "verb": self.verb,
                "headers": {"Authorization": f"Bearer {token_3lo}"},
            }
        }


class ActivityOutputParameterAcc(ActivityOutputParameter):
    folder_id: str
    project_id: str
    file_name: str
    _storage_id: Optional[str] = PrivateAttr(default=None)
    _item_lineage_urn: Optional[str] = PrivateAttr(default=None)

    def work_item_arg_3lo(self, token_3lo: str) -> dict[str, Any]:
     storage_id = create_storage(project_id=self.project_id, folder_urn=self.folder_id, file_name=self.file_name, token=token_3lo)
     self._storage_id = storage_id
     return {
            self.name: {
                "url": self._storage_id,
                "verb": self.verb,
                "headers": {"Authorization": f"Bearer {token_3lo}"},
            }
        }
    
    def create_acc_item(self, token: str):
        if not self._storage_id:
           raise RuntimeError("No storage have being creaded")
           
        resp = create_item_with_first_version(
            project_id=self.project_id,
            folder_urn=self.folder_id,
            file_name=self.file_name,
            storage_id=self._storage_id,
            token=token
        )
        self._item_lineage_urn = resp["data"]["id"]
        return resp

    def get_lineage_urn(self) -> str:
        if not self._item_lineage_urn:
            raise RuntimeError("No ACC item lineage urn available")
        return self._item_lineage_urn

    def resolve_storage_id(self, token: str) -> str:
        if self._storage_id:
            return self._storage_id

        if not self._item_lineage_urn:
            raise RuntimeError("No ACC storage or lineage urn available for download")

        tip_payload = get_item_tip_version(
            project_id=self.project_id,
            item_lineage_urn=self._item_lineage_urn,
            token=token,
        )
        self._storage_id = find_tip_storage_id(tip_payload)
        return self._storage_id

    def download_to(self, output_path: str, token: str) -> None:
        storage_id = self.resolve_storage_id(token)
        bucket_key, object_key = storage_id.split("urn:adsk.objects:os.object:")[1].split("/", 1)
        self.bucketKey = bucket_key
        self.objectKey = object_key
        super().download_to(output_path, token)

class WorkItemAcc(WorkItem):

    def build_arguments_3lo(self, token3lo: str) ->dict[str, Any]:
        payload: dict[str, Any] = {}
        for param in self.parameters:
            if isinstance(param, ActivityJsonParameter):
                payload |= param.work_item_arg()
            elif isinstance(param, (ActivityInputParameterAcc, ActivityOutputParameterAcc, UploadActivityInputParameter)):
                payload |= param.work_item_arg_3lo(token3lo)
            else:
                raise TypeError(
                    f"Parameter '{param.name}' must be ActivityInputParameterAcc or ActivityOutputParameterAcc for 3LO"
                )
        return payload

    def run_public_activity(self, token3lo: str, activity_signature: str):
        args = self.build_arguments_3lo(token3lo)
        response = run_public_work_item(
            token=token3lo,
            full_activity_alias=self.activity_full_alias,
            work_item_args=args,
            signature=activity_signature,
        )
        workitem_id = response.get("id")
        if not workitem_id:
            raise RuntimeError("No work item id returned from run_public_work_item")

        return workitem_id

    def execute(
        self,
        token: str,
        max_wait: int = 600,
        interval: int = 10,
        on_event: PollCallback | None = None,
        *,
        activity_signature: str,
    ) -> dict[str, Any]:
        workitem_id = self.run_public_activity(token3lo=token, activity_signature=activity_signature)
        return poll_workitem_status(
            workitem_id=workitem_id,
            token=token,
            max_wait=max_wait,
            interval=interval,
            on_event=on_event,
        )

====================================================================================================
FILE: aps_automation_sdk/cli.py
====================================================================================================

import argparse
import getpass
import json
import os
from pathlib import Path
from typing import Annotated, Any

from dotenv import load_dotenv

from aps_automation_sdk.signing import export_public_key, generate_key_file, sign_activity
from aps_automation_sdk.utils import (
    get_forgeapp_profile,
    get_token,
    set_nickname,
    upload_public_key,
)


def resolve_client_credentials() -> Annotated[tuple[str, str], "Resolved APS client id and client secret"]:
    """
    Resolve ``CLIENT_ID`` and ``CLIENT_SECRET`` from environment variables or
    interactive prompts.

    Raises:
        ValueError: If either credential remains empty after prompting.
    """
    client_id = os.getenv("CLIENT_ID", "").strip()
    client_secret = os.getenv("CLIENT_SECRET", "").strip()

    if not client_id:
        client_id = input("APS CLIENT_ID: ").strip()
    if not client_secret:
        client_secret = getpass.getpass("APS CLIENT_SECRET: ").strip()

    if not client_id or not client_secret:
        raise ValueError("Missing CLIENT_ID or CLIENT_SECRET.")

    return client_id, client_secret


def resolve_token_from_credentials() -> Annotated[str, "2-legged OAuth access token"]:
    """Obtain a 2-legged OAuth bearer token using resolved client credentials."""
    client_id, client_secret = resolve_client_credentials()
    return get_token(client_id=client_id, client_secret=client_secret)


def run_signing_generate(args: Annotated[argparse.Namespace, "Parsed CLI arguments"]) -> Annotated[int, "Process exit code"]:
    """
    Generate a new RSA private/public key pair and save the private key to a JSON file.

    The output file contains the key material in Autodesk-compatible JSON format.
    Keep this file secret — **never commit it to version control**.
    """
    generate_key_file(keyfile=args.keyfile, key_size=args.key_size)
    print(f"Private key created at: {Path(args.keyfile).resolve()}")
    return 0


def run_signing_export(args: Annotated[argparse.Namespace, "Parsed CLI arguments"]) -> Annotated[int, "Process exit code"]:
    """
    Export the public key from a private key JSON file into a separate public key JSON file.

    The output file contains only the ``Exponent`` and ``Modulus`` fields required by
    ``PATCH /forgeapps/me``. It is safe to share and can be committed.
    """
    export_public_key(keyfile=args.keyfile, pubkeyfile=args.pubkeyfile)
    print(f"Public key created at: {Path(args.pubkeyfile).resolve()}")
    return 0


def run_signing_sign(args: Annotated[argparse.Namespace, "Parsed CLI arguments"]) -> Annotated[int, "Process exit code"]:
    """
    Sign a full activity ID using RSA PKCS#1 v1.5 SHA-256 and print the Base64 signature.

    The activity ID must be the fully-qualified form: ``nickname.ActivityName+alias``.
    Pass the printed signature to ``WorkItemAcc.run_public_activity(activity_signature=...)``.
    """
    signature = sign_activity(keyfile=args.keyfile, activity_id=args.activity_id)
    print(signature)
    return 0


def run_public_key_info(args: Annotated[argparse.Namespace, "Parsed CLI arguments"]) -> Annotated[int, "Process exit code"]:
    """
    Fetch and print the current ``GET /forgeapps/me`` profile as JSON.

    Use this to verify the registered nickname and confirm the public key was
    uploaded successfully after running ``public-key upload``.
    """
    token = resolve_token_from_credentials()
    profile = get_forgeapp_profile(token=token)
    print(json.dumps(profile, indent=2))
    return 0


def load_public_key(
    pubkeyfile: Annotated[str, "Path to public key JSON file"],
) -> Annotated[dict[str, Any], "Parsed public key payload"]:
    """
    Read and lightly validate a public key JSON file.

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: If the payload is not a non-empty JSON object.
    """
    path = Path(pubkeyfile)
    if not path.exists():
        raise FileNotFoundError(f"Public key file not found: {pubkeyfile}")
    payload = json.loads(path.read_text(encoding="utf-8"))
    if not isinstance(payload, dict) or not payload:
        raise ValueError("Public key JSON must be a non-empty object")
    return payload


def run_public_key_upload(args: Annotated[argparse.Namespace, "Parsed CLI arguments"]) -> Annotated[int, "Process exit code"]:
    """
    Upload a public key JSON to ``PATCH /forgeapps/me`` so APS can verify signed workitems.

    If ``--nickname`` is provided, the APS app nickname is **registered/changed** first via
    ``PATCH /forgeapps/me`` before the key is uploaded. A 409 response on nickname means the
    app already has DA resources and the existing nickname is kept instead.
    """
    token = resolve_token_from_credentials()
    if args.nickname:
        applied = set_nickname(token=token, nickname=args.nickname)
        print(f"Nickname set to: {applied}")

    public_key = load_public_key(args.pubkeyfile)
    response = upload_public_key(token=token, public_key=public_key)
    print(json.dumps(response, indent=2))
    return 0


def build_parser() -> Annotated[argparse.ArgumentParser, "Configured CLI parser"]:
    """
    Build and return the ``aps-automation`` ArgumentParser.

    Registers all subcommands and attaches handler functions via ``set_defaults``.
    """
    parser = argparse.ArgumentParser(
        prog="aps-automation",
        description="APS Automation SDK CLI — manage signing keys, upload public keys, and sign activity IDs for APS Design Automation public activities.",
    )
    subparsers = parser.add_subparsers(dest="command")

    signing_parser = subparsers.add_parser(
        "signing",
        help="RSA key generation and activity-ID signing utilities.",
    )
    signing_subparsers = signing_parser.add_subparsers(dest="signing_command")

    generate_parser = signing_subparsers.add_parser(
        "generate",
        help="Generate a new RSA private key JSON file (keep secret, never commit).",
    )
    generate_parser.add_argument("--keyfile", required=True, help="Path where the private key JSON will be saved.")
    generate_parser.add_argument("--key-size", type=int, default=2048, help="RSA key size in bits (default: 2048).")
    generate_parser.set_defaults(func=run_signing_generate)

    export_parser = signing_subparsers.add_parser(
        "export",
        help="Extract the public key (Exponent + Modulus) from a private key JSON into a separate file for upload.",
    )
    export_parser.add_argument("--keyfile", required=True, help="Path to the private key JSON file.")
    export_parser.add_argument("--pubkeyfile", required=True, help="Path where the public key JSON will be saved.")
    export_parser.set_defaults(func=run_signing_export)

    sign_parser = signing_subparsers.add_parser(
        "sign",
        help="Sign a fully-qualified activity ID (nickname.Activity+alias) and print the Base64 signature.",
    )
    sign_parser.add_argument("--keyfile", required=True, help="Path to the private key JSON file.")
    sign_parser.add_argument(
        "--activity-id",
        required=True,
        help="Full activity ID to sign, e.g. myNickname.MyActivity+prod.",
    )
    sign_parser.set_defaults(func=run_signing_sign)

    public_key_parser = subparsers.add_parser(
        "public-key",
        help="Manage the public key registered on your APS app (PATCH/GET /forgeapps/me).",
    )
    public_key_subparsers = public_key_parser.add_subparsers(dest="public_key_command")

    info_parser = public_key_subparsers.add_parser(
        "info",
        help="Print the current forgeapps/me profile (nickname + registered public key).",
    )
    info_parser.set_defaults(func=run_public_key_info)

    upload_parser = public_key_subparsers.add_parser(
        "upload",
        help="Upload a public key JSON to forgeapps/me so APS can verify signed workitems. Use --nickname to register/change the app nickname at the same time.",
    )
    upload_parser.add_argument("--pubkeyfile", required=True, help="Path to the public key JSON file to upload.")
    upload_parser.add_argument(
        "--nickname",
        help="Register or change the APS app nickname before uploading the key (PATCH /forgeapps/me).",
    )
    upload_parser.set_defaults(func=run_public_key_upload)

    return parser


def cli() -> Annotated[int, "Process exit code"]:
    """
    Main CLI entry-point invoked by the ``aps-automation`` console script.

    Returns 1 if no subcommand is given, 2 on handler errors.
    """
    load_dotenv(override=False)
    parser = build_parser()
    args = parser.parse_args()
    if not hasattr(args, "func"):
        parser.print_help()
        return 1
    try:
        return args.func(args)
    except Exception as exc:
        parser.exit(status=2, message=f"Error: {exc}\n")


if __name__ == "__main__":
    raise SystemExit(cli())

====================================================================================================
FILE: aps_automation_sdk/core.py
====================================================================================================

import requests
import time
import logging
import os
from dataclasses import dataclass
from typing import Annotated, Any, Callable, Literal
from dotenv import load_dotenv
from .dsl import RegisterBundleResponse, UploadParameters, GetSignedS3UrlsResponse, CompleteUploadRequest

load_dotenv()

APS_BASE_URL = "https://developer.api.autodesk.com"
OSS_V2_BASE_URL = f"{APS_BASE_URL}/oss/v2"
OSS_V4_BASE_URL = f"{APS_BASE_URL}/oss/v4"
MD_BASE_URL = f"{APS_BASE_URL}/modelderivative/v2" 
DA_BASE_URL = f"{APS_BASE_URL}/da/us-east/v3" 
AUTH_URL = f"{APS_BASE_URL}/authentication/v2/token"

KnownWorkItemStatus = Literal[
    "pending",
    "inprogress",
    "success",
    "failedDownload",
    "failedInstructions",
    "failedUpload",
    "failedLimitProcessingTime",
    "cancelled",
]
PollCallback = Callable[["WorkItemPollEvent"], None]


@dataclass(frozen=True)
class WorkItemPollEvent:
    workitem_id: str
    status: str
    elapsed_seconds: int
    max_wait_seconds: int
    report_url: str | None
    is_terminal: bool

def get_nickname(token: str) -> str:
    """
    Get the nickname (owner/qualifier) for the current APS account.
    This is the prefix used for AppBundles and Activities.
    Returns the nickname string directly.
    """
    url = f"{DA_BASE_URL}/forgeapps/me"
    r = requests.get(
        url,
        headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
        timeout=30,
    )
    r.raise_for_status()
    # The API returns a JSON object with nickname and publicKey
    # Example response: {"nickname": "viktortest", "publicKey": {...}}
    response_data = r.json()
    return response_data.get("nickname", response_data)

def get_signed_s3_upload(
        bucketKey: Annotated[str, "Unique Name you assign to a bucket, Possible values: -_.a-z0-9 (between 3-128 characters in length"],
        objectKey: Annotated[str, "URL-encoded object key to create signed URL for, basicallythenameofthefile"],
        token: Annotated[str, "2Lo Token"]
)->GetSignedS3UrlsResponse:
    """
    We need to check the encoded url part of this!
    """
    url = f"{OSS_V2_BASE_URL}/buckets/{bucketKey}/objects/{objectKey}/signeds3upload"
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
    r = requests.get(url, headers=headers, timeout=30)
    r.raise_for_status()
    response = r.json()
    return GetSignedS3UrlsResponse.model_validate(response)

def put_to_signed_url(signed_url: str, file_path: str) -> int:
    """
    Returns HTTP status code, 200 or 201 indicates success
    """
    with open(file_path, "rb") as f:
        r = requests.put(
            signed_url,
            data=f,
            headers={"Content-Type": "application/octet-stream"},
            timeout=120
        )
        r.raise_for_status()
        return r.status_code

def complete_signed_s3_upload(
        bucketKey: Annotated[str, "Unique Name of the bocket"],
        objectKey: Annotated[str, "URL-encoded object key to create signed URL for, basicallythenameofthefile"],
        uploadKey: Annotated[str, "UploadKey "],
        token: Annotated[str, "2Lo Token"]
    ) -> CompleteUploadRequest:
    url = f"{OSS_V2_BASE_URL}/buckets/{bucketKey}/objects/{objectKey}/signeds3upload"
    payload = {"uploadKey": uploadKey}
    header = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
    r = requests.post(url, headers=header, json=payload, timeout=30)
    r.raise_for_status()
    return CompleteUploadRequest.model_validate(r.json())

def build_oss_urn(
        bucketKey:Annotated[str, "Unique Name of the bucket"],
        objectKey: Annotated[str, "URL-encode object key"]
    ) -> str:
    return f"urn:adsk.objects:os.object:{bucketKey}/{objectKey}"


def register_appbundle(
        appBundleId: Annotated[str, "Name of AppBundle Only alphanumeric characters and _ (underscore) are allowed."],
        engine: Annotated[str, "Engine to be use in the Automation api e.g'Autodesk.Revit+2021'"],
        description: Annotated[str, "App bundle description"],
        token: Annotated[str, "2Lo Token"]
)->RegisterBundleResponse:
    url = f"{DA_BASE_URL}/appbundles" 
    payload = {"id": appBundleId, "engine": engine, "description":description}
    header = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
    r = requests.post(url,headers=header, json=payload)
    r.raise_for_status()
    return RegisterBundleResponse(**r.json())

def upload_appbundle(upload_parameters: UploadParameters, zip_path: str) -> Annotated[int, "Status Code e.g 200"]:
    with open(zip_path, "rb") as f:
        files = {**upload_parameters.formData.model_dump(by_alias=True, exclude_none=True), 'file': (os.path.basename(zip_path), f, "application/octet-stream")}
        r = requests.post(upload_parameters.endpointURL, files=files, timeout=60)
    r.raise_for_status()
    return r.status_code

def create_appbundle_alias(
    app_id: str, alias_id: str, version: int, token: str
) -> dict[str, Any]:
    """
    Create an alias for an AppBundle version.
    """
    url = f"{DA_BASE_URL}/appbundles/{app_id}/aliases"
    payload = {"version": version, "id": alias_id}
    header = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
    r = requests.post(
        url,
        headers=header,
        json=payload,
        timeout=30,
    )
    r.raise_for_status()
    return r.json()

def get_signed_s3_download(
        bucketKey: Annotated[str, "Unique name of the bucket"],
        objectKey: Annotated[str, "URL-encoded object key to create signed URL for, basicallythenameofthefile"],
        token: Annotated[str, "2Lo Token"]
) -> None:
    url = f"{OSS_V2_BASE_URL}/buckets/{bucketKey}/objects/{objectKey}/signeds3download"
    header = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
    r = requests.get(url=url, headers=header, timeout=30)
    r.raise_for_status()
    return r.json()

def dowload_from_signed_url(
        signed_url: Annotated[str, "Signed url from the previos"],
        output_path: Annotated[str, "Output path str"],
)-> int:
    """
    """
    r = requests.get(signed_url, timeout=120)
    r.raise_for_status()
    
    with open(output_path, "wb") as f:
        f.write(r.content)
    
    return r.status_code
        
def create_activity_alias(
    activity_id: str, alias_id: str, version: int, token: str
) -> dict[str, Any]:
    """
    Create an alias for an Activity version.
    """
    url = f"{DA_BASE_URL}/activities/{activity_id}/aliases"
    payload = {"version": version, "id": alias_id}
    r = requests.post(
        url,
        headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
        json=payload,
        timeout=30,
    )
    r.raise_for_status()
    return r.json()


def create_activity(
    token: str,
    payload: dict
) -> dict:
    url = f"{DA_BASE_URL}/activities"
    r = requests.post(url, headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"}, json=payload, timeout=30)
    
    if r.status_code != 200:
        print(f" Error found: {r.text=}")
    
    r.raise_for_status()
    return r.json()


def run_work_item(token: str, full_activity_alias: str, work_item_args: dict[str,Any]):
    url = f"{DA_BASE_URL}/workitems"
    payload = {
        "activityId": full_activity_alias,
        "arguments": work_item_args 
    }
    r = requests.post(url, headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"}, json=payload, timeout=30)
    
    if r.status_code != 200:
        print(f" Error found: {r.text=}")
    
    r.raise_for_status()
    return r.json()

def run_public_work_item(token: str, full_activity_alias: str, work_item_args: dict[str,Any], signature: str):
    url = f"{DA_BASE_URL}/workitems"
    payload = {
        "activityId": full_activity_alias,
        "arguments": work_item_args,
        "signatures": {
            "activityId": signature,
            "workItem": signature
        }
    }
    import pprint
    pprint.pp(f"{payload=}")
    r = requests.post(url, headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json","x-ads-workitem-signature": signature}, json=payload, timeout=30)
    
    if r.status_code != 200:
        print(f" Error found: {r.text=}")
    
    r.raise_for_status()
    return r.json()



def get_workitem_status(workitem_id: str, token: str) -> dict[str, Any]:
    """
    Get the current status and report URL for a WorkItem.
    """
    url = f"{DA_BASE_URL}/workitems/{workitem_id}"
    r = requests.get(
        url,
        headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
        timeout=30,
    )
    r.raise_for_status()
    return r.json()


def is_terminal_workitem_status(status: str) -> bool:
    normalized = status.strip().lower()
    return normalized == "success" or normalized == "cancelled" or normalized.startswith("failed")


def poll_workitem_status(
    workitem_id: str,
    token: str,
    max_wait: int = 600,
    interval: int = 10,
    on_event: PollCallback | None = None,
) -> dict[str, Any]:
    elapsed = 0
    logging.info("Polling work item status, id=%s", workitem_id)

    last_status = ""
    status_resp = {}
    
    while elapsed <= max_wait:
        status_resp = get_workitem_status(workitem_id, token)
        last_status = str(status_resp.get("status", ""))
        report_url = status_resp.get("reportUrl")
        is_terminal = is_terminal_workitem_status(last_status)

        if on_event:
            on_event(
                WorkItemPollEvent(
                    workitem_id=workitem_id,
                    status=last_status,
                    elapsed_seconds=elapsed,
                    max_wait_seconds=max_wait,
                    report_url=report_url,
                    is_terminal=is_terminal,
                )
            )

        logging.info("[%3ds] status=%s report_url=%s", elapsed, last_status, report_url)
        if is_terminal:
            report = status_resp.get("reportUrl")
            if report:
                logging.info("Report URL: %s", report)
            break
        if elapsed >= max_wait:
            break
        time.sleep(interval)
        elapsed += interval
    
    return status_resp

====================================================================================================
FILE: aps_automation_sdk/dsl.py
====================================================================================================

from pydantic import BaseModel, Field, ConfigDict

class FormData(BaseModel):
    model_config = ConfigDict(extra="allow", populate_by_name=True)

    key: str
    policy: str
    success_action_status: str = Field(alias="success_action_status")
    success_action_redirect: str = Field(alias="success_action_redirect")

    # Exactly as in the sample: "content-type"
    content_type: str | None = Field(default=None, alias="content-type")

    x_amz_signature: str | None = Field(default=None, alias="x-amz-signature")
    x_amz_credential: str | None = Field(default=None, alias="x-amz-credential")
    x_amz_algorithm: str | None = Field(default=None, alias="x-amz-algorithm")
    x_amz_date: str | None = Field(default=None, alias="x-amz-date")
    x_amz_server_side_encryption: str | None = Field(default=None, alias="x-amz-server-side-encryption")
    x_amz_security_token: str | None = Field(default=None, alias="x-amz-security-token")


class UploadParameters(BaseModel):
    endpointURL: str 
    formData: FormData


class RegisterBundleResponse(BaseModel):
    uploadParameters: UploadParameters
    id: str
    engine: str
    description: str | None = Field(default=None)
    version: int
    
class GetSignedS3UrlsResponse(BaseModel):
    model_config = ConfigDict(extra="ignore", populate_by_name=True)
    uploadKey: str
    urls: list[str]
    urlExpiration: str | None = None
    uploadExpiration: str | None = None
    # Only present when the URL request failed
    status: str | None = None
    reason: str | None = None


class CompleteUploadRequest(BaseModel):
  bucketKey:str 
  objectId:str 
  objectKey:str
  size: int
  contentType: str
  location: str

class GetDownloadS3Url(BaseModel):
    status: str
    url: str
    params: dict
    size: int
    sha1: str

====================================================================================================
FILE: aps_automation_sdk/model_derivative.py
====================================================================================================

import os
import base64
import requests
import time
from typing import Any

# APS API endpoints
APS_BASE_URL = "https://developer.api.autodesk.com"
MD_BASE_URL = f"{APS_BASE_URL}/modelderivative/v2"
AUTH_URL = f"{APS_BASE_URL}/authentication/v2/token"


def safe_base64_encode(text: str) -> str:
    """Encode text to URL-safe base64 format (used for URNs)."""
    return base64.urlsafe_b64encode(text.encode()).decode().strip("=")


def to_md_urn(wip_urn: str) -> str:
    """Convert WIP URN to Model Derivative URN."""
    raw = wip_urn.split("?", 1)[0]
    encoded = base64.urlsafe_b64encode(raw.encode("utf8")).decode("utf8")
    return encoded.rstrip("=")


def get_revit_version_from_manifest(manifest: dict) -> str | None:
    """Extract Revit version from manifest."""
    try:
        derivatives = manifest.get("derivatives", [])
        if not derivatives:
            return None
        
        for derivative in derivatives:
            properties = derivative.get("properties", {})
            doc_info = properties.get("Document Information", {})
            rvt_version = doc_info.get("RVTVersion")
            if rvt_version:
                return str(rvt_version)
        
        return None
    except Exception as e:
        print(f"Error extracting Revit version from manifest: {e}")
        return None


def fetch_manifest(token: str, object_urn: str) -> dict:
    """Fetch model derivative manifest."""
    response = requests.get(
        f"{MD_BASE_URL}/designdata/{object_urn}/manifest",
        headers={"Authorization": f"Bearer {token}"},
        timeout=30
    )
    response.raise_for_status()
    return response.json()


def get_revit_version_from_oss_object(token: str, bucket_key: str, object_key: str) -> str | None:
    """
    Get Revit version from an OSS object by translating it and checking the manifest.
    Revit version string (e.g., "2024") or None if not detected
    """
    print(f"🔍 Detecting Revit version for object: {object_key}")
    
    # Build the OSS object URN
    oss_object_id = f"urn:adsk.objects:os.object:{bucket_key}/{object_key}"
    object_urn = safe_base64_encode(oss_object_id)
    
    # Start a basic translation job to get the manifest
    try:
        start_svf_translation_job(token, object_urn)
        
        # Wait a bit for the manifest to be generated
        import time
        max_wait = 60
        interval = 5
        elapsed = 0
        
        while elapsed < max_wait:
            try:
                manifest = fetch_manifest(token, object_urn)
                version = get_revit_version_from_manifest(manifest)
                
                if version:
                    print(f"✅ Detected Revit version: {version}")
                    return version
                    
                # If manifest exists but no version yet, wait a bit more
                if manifest.get("status") in ["success", "failed"]:
                    break
                    
            except requests.exceptions.RequestException:
                pass
            
            time.sleep(interval)
            elapsed += interval
        
        print("⚠️  Could not detect Revit version from manifest")
        return None
        
    except Exception as e:
        print(f"⚠️  Error detecting Revit version: {e}")
        return None


def start_svf_translation_job(token: str, object_urn: str) -> dict[str, Any]:
    """
    Start SVF translation job for the Revit file generated by Design Automation.
    """
    print("🔄 Starting Model Derivative translation job...")
    
    job_payload = {
        "input": {"urn": object_urn},
        "output": {
            "formats": [
                {
                    "type": "svf2",
                    "views": ["3d", "2d"]
                }
            ]
        }
    }
    
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
        "x-ads-force": "true"
    }
    
    response = requests.post(
        f"{MD_BASE_URL}/designdata/job",
        headers=headers,
        json=job_payload,
        timeout=30
    )
    response.raise_for_status()
    
    result = response.json()
    return result


def get_translation_status(token: str, object_urn: str) -> tuple[str, str]:
    """
    Check the status of a Model Derivative translation job.
    """
    response = requests.get(
        f"{MD_BASE_URL}/designdata/{object_urn}/manifest",
        headers={"Authorization": f"Bearer {token}"},
        timeout=30
    )
    
    if response.status_code == 202:
        return "inprogress", "Manifest not ready"
    
    response.raise_for_status()
    manifest = response.json()
    
    status = manifest.get("status", "unknown")
    progress = manifest.get("progress", "N/A")
    
    return status, progress


def translate_file_in_oss(
    token: str,
    bucket_key: str,
    output_object_key: str,
    max_wait_time: int = 300,
    poll_interval: int = 15,
    verbose: bool = False,
) -> str:
    """
    Translate a file in OSS (bucket/object) into SVF for APS Viewer.
    Returns the base64 URN to use with APS Viewer.
    """
    def log(*args, **kwargs):
        if verbose:
            print(*args, **kwargs)

    log("🎯 STARTING MODEL TRANSLATION FOR VIEWING")
    log(f"Bucket: {bucket_key}")
    log(f"Object: {output_object_key}")

    oss_object_id = f"urn:adsk.objects:os.object:{bucket_key}/{output_object_key}"
    object_urn = to_md_urn(oss_object_id)
    log(f"🔗 Object URN: {object_urn}")

    job = start_svf_translation_job(token, object_urn)
    log(f"📋 Job Details: {job.get('urn', 'No URN in response')}")
    log("⏳ Monitoring translation progress...")

    elapsed_time = 0

    while elapsed_time < max_wait_time:
        try:
            status, progress = get_translation_status(token, object_urn)
            log(f"  > MD Status: {status} ({progress})")

            if status == "success":
                log("✅ Translation completed successfully!")
                log(f"🎉 Model ready for viewing with URN: {object_urn}")
                return object_urn
            if status == "failed":
                log("❌ Translation failed!")
                raise RuntimeError("Model Derivative translation failed")
            if status in ["inprogress", "pending"]:
                pass
            else:
                log(f"⚠️ Unknown status: {status}")
        except requests.exceptions.RequestException as exc:
            log(f"⚠️ Error checking translation status: {exc}. Retrying...")

        time.sleep(poll_interval)
        elapsed_time += poll_interval

    log(f"⏰ Translation timed out after {max_wait_time}s")
    raise RuntimeError(f"Translation timeout after {max_wait_time} seconds")

def get_translation_info(token: str, object_urn: str) -> dict[str, Any]:
    """
    Get detailed information about a translated model.
    """
    
    response = requests.get(
        f"{MD_BASE_URL}/designdata/{object_urn}/manifest",
        headers={"Authorization": f"Bearer {token}"},
        timeout=30
    )
    response.raise_for_status()
    
    manifest = response.json()
    
    # Extract useful information
    info = {
        "status": manifest.get("status"),
        "progress": manifest.get("progress"),
        "type": manifest.get("type"),
        "region": manifest.get("region"),
        "urn": object_urn
    }
    
    # Get derivative information
    derivatives = manifest.get("derivatives", [])
    if derivatives:
        derivative = derivatives[0]
        info.update({
            "output_type": derivative.get("outputType"),
            "has_thumbnail": "thumbnail" in derivative,
            "children_count": len(derivative.get("children", [])),
        })
    
    return info

====================================================================================================
FILE: aps_automation_sdk/signing.py
====================================================================================================

import base64
import json
from functools import lru_cache
from importlib import import_module
from pathlib import Path
from types import ModuleType
from typing import Annotated, Any


@lru_cache(maxsize=1)
def require_cryptography_modules() -> Annotated[tuple[ModuleType, ModuleType, ModuleType], "Loaded cryptography modules: hashes, padding, rsa"]:
    """
    Load cryptography modules lazily so the SDK can be imported without the signing extra.

    The result is cached so imports happen at most once per process.
    Raises ``RuntimeError`` with install instructions if the ``cryptography``
    package is not installed.
    """
    try:
        hashes = import_module("cryptography.hazmat.primitives.hashes")
        padding = import_module("cryptography.hazmat.primitives.asymmetric.padding")
        rsa = import_module("cryptography.hazmat.primitives.asymmetric.rsa")
    except ImportError as exc:
        raise RuntimeError(
            "Signing features require cryptography. Install with: uv add \"aps-automation-sdk[signing]\" (or pip install \"aps-automation-sdk[signing]\")."
        ) from exc
    return hashes, padding, rsa


def base64_encode(
    data: Annotated[bytes, "Raw bytes to encode as base64 text"],
) -> Annotated[str, "ASCII base64 encoded string"]:
    """Encode bytes as a standard (not URL-safe) base64 ASCII string."""
    return base64.b64encode(data).decode("ascii")


def base64_decode(
    text: Annotated[str, "ASCII base64 encoded string"],
) -> Annotated[bytes, "Decoded raw bytes"]:
    """Decode a standard base64 ASCII string back to raw bytes."""
    return base64.b64decode(text)


def int_to_bytes(
    value: Annotated[int, "Positive integer to convert to big-endian bytes"],
) -> Annotated[bytes, "Big-endian bytes"]:
    """Convert a positive integer to its minimal big-endian byte representation."""
    byte_length = (value.bit_length() + 7) // 8
    return value.to_bytes(byte_length, byteorder="big")


def bytes_to_int(
    data: Annotated[bytes, "Big-endian bytes to convert into integer"],
) -> Annotated[int, "Converted integer value"]:
    """Interpret big-endian bytes as an unsigned integer."""
    return int.from_bytes(data, byteorder="big")


def generate_key_file(
    keyfile: Annotated[str, "Path to write private key JSON file"],
    key_size: Annotated[int, "RSA key size in bits (recommended 2048)"] = 2048,
) -> Annotated[str, "Absolute path where the private key JSON file was saved"]:
    """
    Generate an RSA private key file in Autodesk signer-compatible JSON format.

    Writes a JSON file with base64-encoded CRT components (D, DP, DQ, Exponent,
    InverseQ, Modulus, P, Q). Keep this file secret and never commit it to
    source control. Use ``export_public_key`` to extract the public portion.

    Returns:
        Absolute path of the written key file.
    """
    _, _, rsa = require_cryptography_modules()
    private_key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)
    private_numbers = private_key.private_numbers()
    public_numbers = private_numbers.public_numbers

    key_data = {
        "D": base64_encode(int_to_bytes(private_numbers.d)),
        "DP": base64_encode(int_to_bytes(private_numbers.dmp1)),
        "DQ": base64_encode(int_to_bytes(private_numbers.dmq1)),
        "Exponent": base64_encode(int_to_bytes(public_numbers.e)),
        "InverseQ": base64_encode(int_to_bytes(private_numbers.iqmp)),
        "Modulus": base64_encode(int_to_bytes(public_numbers.n)),
        "P": base64_encode(int_to_bytes(private_numbers.p)),
        "Q": base64_encode(int_to_bytes(private_numbers.q)),
    }

    output_path = Path(keyfile)
    output_path.write_text(json.dumps(key_data, indent=2), encoding="utf-8")
    return str(output_path.resolve())


def export_public_key(
    keyfile: Annotated[str, "Path to existing private key JSON file"],
    pubkeyfile: Annotated[str, "Path to write exported public key JSON file"],
) -> None:
    """
    Export only the public key fields (Exponent and Modulus) from a private key JSON.

    The resulting file is safe to share and is the payload expected by
    ``upload_public_key`` / ``aps-automation public-key upload``.
    """
    key_path = Path(keyfile)
    if not key_path.exists():
        raise FileNotFoundError(f"Key file not found: {keyfile}")

    key_data = json.loads(key_path.read_text(encoding="utf-8"))
    public_key_data = {
        "Exponent": key_data["Exponent"],
        "Modulus": key_data["Modulus"],
    }

    output_path = Path(pubkeyfile)
    output_path.write_text(json.dumps(public_key_data, indent=2), encoding="utf-8")


def load_private_key_data(
    keyfile: Annotated[str, "Path to private key JSON file"],
) -> Annotated[dict[str, Any], "Parsed private key payload"]:
    """
    Load and validate a private key JSON file produced by ``generate_key_file``.

    Raises:
        FileNotFoundError: If the key file does not exist.
        ValueError: If required CRT fields (D, Exponent, InverseQ, Modulus, P, Q)
            are missing from the payload.
    """
    key_path = Path(keyfile)
    if not key_path.exists():
        raise FileNotFoundError(f"Key file not found: {keyfile}")
    key_data = json.loads(key_path.read_text(encoding="utf-8"))
    required = ["D", "Exponent", "InverseQ", "Modulus", "P", "Q"]
    missing = [field for field in required if field not in key_data]
    if missing:
        raise ValueError(f"Invalid key file. Missing fields: {', '.join(missing)}")
    return key_data


def sign_activity(
    keyfile: Annotated[str, "Path to private key JSON file used for signing"],
    activity_id: Annotated[str, "Full activity id in format nickname.Activity+alias"],
) -> Annotated[str, "Base64 RSA-SHA256 signature for the activity id"]:
    """
    Sign an activity ID with RSA-SHA256 PKCS#1 v1.5 and return a base64 signature.

    The activity ID is encoded as UTF-16-LE before signing, matching the encoding
    expected by APS for workitem signature verification.

    Args:
        keyfile: Path to a private key JSON file produced by ``generate_key_file``.
        activity_id: Full activity identifier in ``nickname.Activity+alias`` form.

    Returns:
        Base64-encoded signature to pass as ``activity_signature`` in
        ``WorkItemAcc.run_public_activity``.
    """
    hashes, padding, rsa = require_cryptography_modules()
    key_data = load_private_key_data(keyfile)

    d = bytes_to_int(base64_decode(key_data["D"]))
    p = bytes_to_int(base64_decode(key_data["P"]))
    q = bytes_to_int(base64_decode(key_data["Q"]))
    e = bytes_to_int(base64_decode(key_data["Exponent"]))
    n = bytes_to_int(base64_decode(key_data["Modulus"]))
    iqmp = bytes_to_int(base64_decode(key_data["InverseQ"]))

    dmp1 = bytes_to_int(base64_decode(key_data["DP"])) if "DP" in key_data else d % (p - 1)
    dmq1 = bytes_to_int(base64_decode(key_data["DQ"])) if "DQ" in key_data else d % (q - 1)

    private_numbers = rsa.RSAPrivateNumbers(
        p=p,
        q=q,
        d=d,
        dmp1=dmp1,
        dmq1=dmq1,
        iqmp=iqmp,
        public_numbers=rsa.RSAPublicNumbers(e=e, n=n),
    )
    private_key = private_numbers.private_key()

    message = activity_id.encode("utf-16-le")
    signature = private_key.sign(message, padding.PKCS1v15(), hashes.SHA256())
    return base64_encode(signature)

====================================================================================================
FILE: aps_automation_sdk/ssa.py
====================================================================================================

import os
import time
from dataclasses import dataclass

import jwt
import requests

APS_BASE_URL = "https://developer.api.autodesk.com"
AUTH_TOKEN_URL = f"{APS_BASE_URL}/authentication/v2/token"

DEFAULT_SSA_SCOPES = "code:all data:read data:write data:create data:search"


@dataclass(frozen=True)
class SsaConfig:
    """Configuration required to mint a 3-legged token via Secure Service Accounts."""

    client_id: str
    client_secret: str
    service_account_id: str
    key_id: str
    private_key: str
    scope: str

    @classmethod
    def from_env(cls) -> "SsaConfig":
        """Load SSA config from environment variables.

        Supported variables:
        - CLIENT_ID_SSA (fallback: APS_SSA_CLIENT_ID, APS_CLIENT_ID, CLIENT_ID)
        - CLIENT_SECRET_SSA (fallback: APS_SSA_CLIENT_SECRET, APS_CLIENT_SECRET, CLIENT_SECRET)
        - APS_SSA_SERVICE_ACCOUNT_ID
        - APS_SSA_KEY_ID
        - APS_SSA_PRIVATE_KEY (supports escaped newlines)
        - APS_SSA_SCOPE (default: code:all data:read data:write data:create data:search)
        """
        client_id = (
            os.getenv("CLIENT_ID_SSA")
            or os.getenv("APS_SSA_CLIENT_ID")
            or os.getenv("APS_CLIENT_ID")
            or os.getenv("CLIENT_ID")
        )
        client_secret = (
            os.getenv("CLIENT_SECRET_SSA")
            or os.getenv("APS_SSA_CLIENT_SECRET")
            or os.getenv("APS_CLIENT_SECRET")
            or os.getenv("CLIENT_SECRET")
        )
        required = {
            "CLIENT_ID_SSA": client_id,
            "CLIENT_SECRET_SSA": client_secret,
            "APS_SSA_SERVICE_ACCOUNT_ID": os.getenv("APS_SSA_SERVICE_ACCOUNT_ID"),
            "APS_SSA_KEY_ID": os.getenv("APS_SSA_KEY_ID"),
            "APS_SSA_PRIVATE_KEY": os.getenv("APS_SSA_PRIVATE_KEY"),
        }
        missing = [name for name, value in required.items() if not value]
        if missing:
            raise RuntimeError(
                "Missing required environment variables: " + ", ".join(sorted(missing))
            )

        scope = os.getenv("APS_SSA_SCOPE", DEFAULT_SSA_SCOPES)

        return cls(
            client_id=str(required["CLIENT_ID_SSA"]),
            client_secret=str(required["CLIENT_SECRET_SSA"]),
            service_account_id=str(required["APS_SSA_SERVICE_ACCOUNT_ID"]),
            key_id=str(required["APS_SSA_KEY_ID"]),
            private_key=str(required["APS_SSA_PRIVATE_KEY"]).replace("\\n", "\n"),
            scope=" ".join(scope.split()),
        )

    @property
    def scopes(self) -> list[str]:
        values = [scope for scope in self.scope.split(" ") if scope]
        if not values:
            raise RuntimeError("APS_SSA_SCOPE must contain at least one scope")
        return values


def build_ssa_jwt(config: SsaConfig, expires_in_seconds: int = 300) -> str:
    """Build a JWT assertion for SSA token exchange.

    APS requires:
    - iss = APS client id
    - sub = Service account Oxygen id
    - aud = https://developer.api.autodesk.com/authentication/v2/token
    - exp <= now + 300 seconds
    """
    if not 1 <= expires_in_seconds <= 300:
        raise ValueError("expires_in_seconds must be between 1 and 300")

    now = int(time.time())
    claims = {
        "iss": config.client_id,
        "sub": config.service_account_id,
        "aud": AUTH_TOKEN_URL,
        "exp": now + expires_in_seconds,
        "scope": config.scopes,
    }
    headers = {
        "alg": "RS256",
        "kid": config.key_id,
    }

    return jwt.encode(
        claims,
        config.private_key,
        algorithm="RS256",
        headers=headers,
    )


def parse_token_response(payload: dict[str, object]) -> str:
    """Validate and extract the bearer access token from APS token response payload."""
    access_token = payload.get("access_token")
    token_type = str(payload.get("token_type", "")).lower()

    if not access_token or token_type != "bearer":
        raise RuntimeError(f"Unexpected SSA token response: {payload}")

    return str(access_token)


def exchange_jwt_assertion_for_token(
    client_id: str,
    client_secret: str,
    assertion: str,
    scope: str,
) -> str:
    """Call POST /authentication/v2/token with JWT bearer grant to mint a 3LO token."""
    response = requests.post(
        AUTH_TOKEN_URL,
        headers={
            "Accept": "application/json",
            "Content-Type": "application/x-www-form-urlencoded",
        },
        data={
            "grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
            "assertion": assertion,
            "scope": " ".join(scope.split()),
        },
        auth=(client_id, client_secret),
        timeout=30,
    )
    response.raise_for_status()
    return parse_token_response(response.json())


def get_ssa_3lo_token(config: SsaConfig, expires_in_seconds: int = 300) -> str:
    """Mint a 3-legged token from SSA config."""
    assertion = build_ssa_jwt(config=config, expires_in_seconds=expires_in_seconds)
    return exchange_jwt_assertion_for_token(
        client_id=config.client_id,
        client_secret=config.client_secret,
        assertion=assertion,
        scope=config.scope,
    )

====================================================================================================
FILE: aps_automation_sdk/utils.py
====================================================================================================

from typing import Annotated, Any, Literal

import requests

from aps_automation_sdk.core import upload_appbundle
from aps_automation_sdk.dsl import RegisterBundleResponse

APS_BASE_URL = "https://developer.api.autodesk.com"
OSS_V2_BASE_URL = f"{APS_BASE_URL}/oss/v2"
DA_BASE_URL = f"{APS_BASE_URL}/da/us-east/v3"
AUTH_URL = f"{APS_BASE_URL}/authentication/v2/token"
FORGEAPPS_ME_URL = f"{DA_BASE_URL}/forgeapps/me"

SCOPES = "data:read data:write data:create bucket:create bucket:read code:all"


def get_token(client_id: str, client_secret: str) -> str:
    response = requests.post(
        AUTH_URL,
        data={
            "client_id": client_id,
            "client_secret": client_secret,
            "grant_type": "client_credentials",
            "scope": SCOPES,
        },
        timeout=15,
    )
    response.raise_for_status()
    return response.json()["access_token"]


def get_forgeapp_profile(token: str) -> dict[str, Any]:
    r = requests.get(
        FORGEAPPS_ME_URL,
        headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
        timeout=30,
    )
    r.raise_for_status()
    return r.json()


def parse_json_response_or_none(response: requests.Response) -> Any | None:
    body = response.text
    if not body or not body.strip():
        return None
    try:
        return response.json()
    except ValueError:
        return None


def upload_public_key(token: str, public_key: dict[str, Any]) -> dict[str, Any]:
    if not isinstance(public_key, dict) or not public_key:
        raise ValueError("public_key must be a non-empty dictionary")
    r = requests.patch(
        FORGEAPPS_ME_URL,
        headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
        json={"publicKey": public_key},
        timeout=30,
    )
    r.raise_for_status()
    payload = parse_json_response_or_none(r)
    if isinstance(payload, dict):
        return payload

    profile = get_forgeapp_profile(token)
    if isinstance(profile, dict):
        return profile
    if isinstance(profile, str):
        return {"nickname": profile}
    return {"forgeappProfile": profile}


def set_nickname(token: str, nickname: str) -> str:
    """
    Try to set the nickname.
    Returns the nickname that actually applies.
    """
    url = f"{DA_BASE_URL}/forgeapps/me"
    r = requests.patch(
        url,
        headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
        json={"nickname": nickname},
        timeout=30,
    )

    if r.status_code == 200:
        return nickname

    if r.status_code == 409:
        # App already has resources, nickname is locked, keep the current one
        return get_nickname(token)

    # Often 400 means the nickname is already taken
    try:
        details = r.json()
    except Exception:
        details = r.text
    raise RuntimeError(f"Could not set nickname, status {r.status_code}, details {details}")


def get_nickname(token: str) -> str:
    url = f"{DA_BASE_URL}/forgeapps/me"
    r = requests.get(
        url,
        headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
        timeout=30,
    )
    r.raise_for_status()
    # The API returns a JSON object with nickname and publicKey
    # Example response: {"nickname": "viktortest", "publicKey": {...}}
    response_data = r.json()
    return response_data.get("nickname", response_data)


def create_bucket(
    bucketKey: Annotated[str, "Unique Name you assign to a bucket, Possible values: -_.a-z0-9 (between 3-128 characters in length"],
    token: Annotated[str, "2Lo token"],
    policy_key: Literal["transient", "temporary", "persistent"] = "transient",
    access: None | Literal["full", "read"] = "full",
    region: Literal["US", "EMEA", "AUS", "CAN", "DEU", "IND", "JPN", "GBN"] = "US",
) -> dict[str, Any]:
    """
    Create a bucket in OSS v2.
    https://aps.autodesk.com/en/docs/data/v2/reference/http/buckets-POST/
    """
    url = f"{OSS_V2_BASE_URL}/buckets"
    payload = {"bucketKey": bucketKey, "access": access, "policyKey": policy_key}
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
        "x-ads-region": region,
    }
    r = requests.post(url, headers=headers, json=payload, timeout=30)
    r.raise_for_status()
    return r.json()


def delete_appbundle(appbundleId: str, token: str) -> int:
    url = f"{DA_BASE_URL}/appbundles/{appbundleId}"
    header = {"Authorization": f"Bearer {token}"}
    r = requests.delete(url=url, headers=header, timeout=30)
    r.raise_for_status()
    return r.status_code


def delete_activity(activityId: str, token: str) -> int:
    url = f"{DA_BASE_URL}/activities/{activityId}"
    header = {"Authorization": f"Bearer {token}"}
    r = requests.delete(url=url, headers=header, timeout=30)
    r.raise_for_status()
    return r.status_code


def create_appbundle_version(app_id: str, engine: str, description: str, token: str) -> RegisterBundleResponse:
    """
    POST /appbundles/{id}/versions
    Returns JSON with 'version' and 'uploadParameters' for S3.
    """
    url = f"{DA_BASE_URL}/appbundles/{app_id}/versions"
    payload = {"id": None, "engine": engine, "description": description}
    r = requests.post(
        url,
        headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
        json=payload,
        timeout=60,
    )
    r.raise_for_status()
    return RegisterBundleResponse.model_validate(r.json())


def move_or_create_alias(app_id: str, alias_id: str, version: int, token: str) -> dict[str, Any]:
    """
    PATCH alias to the new version. If the alias does not exist, create it.
    """
    url = f"{DA_BASE_URL}/appbundles/{app_id}/aliases/{alias_id}"
    r = requests.patch(
        url,
        headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
        json={"version": version},
        timeout=30,
    )
    if r.status_code == 404:
        create_url = f"{DA_BASE_URL}/appbundles/{app_id}/aliases"
        r = requests.post(
            create_url,
            headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
            json={"id": alias_id, "version": version},
            timeout=30,
        )
    r.raise_for_status()
    return r.json()


def publish_appbundle_update(
    appbundle_id: str,
    engine: str,
    alias_id: str,
    zip_path: str,
    token: str,
    description: str = "Automated update",
) -> dict[str, Any]:
    """
    Update the running Activity without breaking users:
    1) Create a new AppBundle version
    2) Upload the new .bundle.zip to S3
    3) Move (or create) the alias to the new version

    Returns:
        {
            "appbundle_id": ...,
            "new_version": <int>,
            "alias": ...,
            "alias_version": <int>,
            "s3_status": <int>
        }
    """
    version_resp: RegisterBundleResponse = create_appbundle_version(appbundle_id, engine, description, token)
    version_no = version_resp.version
    s3_status = upload_appbundle(upload_parameters=version_resp.uploadParameters, zip_path=zip_path)
    alias_resp = move_or_create_alias(appbundle_id, alias_id, version_no, token)

    return {
        "appbundle_id": appbundle_id,
        "new_version": version_no,
        "alias": alias_id,
        "alias_version": alias_resp.get("version", version_no),
        "s3_status": s3_status,
    }

====================================================================================================
FILE: examples/Revit_02_export_to_ifc/files/export_settings.json
====================================================================================================

{
  "view_names": ["Foundations"],
  "FileVersion": "IFC4",
  "IFCFileType": "IFC",
  "ExportBaseQuantities": true,
  "SpaceBoundaryLevel": 2,
  "FamilyMappingFile": "C:\\Data\\ifc_mapping.txt",
  "ExportInternalRevitPropertySets": false,
  "ExportIFCCommonPropertySets": true,
  "ExportAnnotations": false,
  "Export2DElements": false,
  "ExportRoomsInView": false,
  "VisibleElementsOfCurrentView": false,
  "ExportLinkedFiles": false,
  "IncludeSteelElements": false,
  "ExportPartsAsBuildingElements": true,
  "UseActiveViewGeometry": false,
  "UseFamilyAndTypeNameForReference": false,
  "Use2DRoomBoundaryForVolume": false,
  "IncludeSiteElevation": false,
  "ExportBoundingBox": false,
  "ExportSolidModelRep": false,
  "StoreIFCGUID": false,
  "ExportSchedulesAsPsets": false,
  "ExportSpecificSchedules": false,
  "ExportUserDefinedPsets": false,
  "ExportUserDefinedPsetsFileName": "",
  "ExportUserDefinedParameterMapping": false,
  "ExportUserDefinedParameterMappingFileName": "",
  "ActivePhase": "",
  "SitePlacement": 0,
  "TessellationLevelOfDetail": 0.0,
  "UseOnlyTriangulation": false
}

====================================================================================================
FILE: examples/Revit_03_create_structural_elements/files/structure.json
====================================================================================================

{
  "units": "m",
  "connectivity": {
    "1": { "x": 0.0, "y": 0.0, "z": 0.0 },
    "2": { "x": 0.0, "y": 6.0, "z": 0.0 },
    "3": { "x": 0.0, "y": 0.0, "z": 4.0 },
    "4": { "x": 0.0, "y": 6.0, "z": 4.0 },
    "5": { "x": 4.0, "y": 0.0, "z": 0.0 },
    "6": { "x": 4.0, "y": 6.0, "z": 0.0 },
    "7": { "x": 4.0, "y": 0.0, "z": 4.0 },
    "8": { "x": 4.0, "y": 6.0, "z": 4.0 }
  },
  "lines": {
    "1": { "nodeI": "1", "nodeJ": "3", "section": "UB305x127x37" },
    "2": { "nodeI": "2", "nodeJ": "4", "section": "UB305x127x37" },
    "3": { "nodeI": "5", "nodeJ": "7", "section": "UB305x127x37" },
    "4": { "nodeI": "6", "nodeJ": "8", "section": "UB305x127x37" },
    "5": { "nodeI": "3", "nodeJ": "4", "section": "UB305x127x37" },
    "6": { "nodeI": "7", "nodeJ": "8", "section": "UB305x127x37" },
    "7": { "nodeI": "3", "nodeJ": "7", "section": "UB305x127x37" },
    "8": { "nodeI": "4", "nodeJ": "8", "section": "UB305x127x37" },
    "9": { "nodeI": "1", "nodeJ": "2", "section": "UB305x127x37" },
    "10": { "nodeI": "5", "nodeJ": "6", "section": "UB305x127x37" }
  }
}

