Source code for qudas.annealing.executor

from __future__ import annotations

import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Any, Tuple, Optional, List

from qudas.core.base import QdExecutorBase
from qudas.core.statistics import energy_statistics

from .input import QdAnnealingInput
from .output import QdAnnealingOutput


[docs] class QdAnnealingExecutor(QdExecutorBase): """複数デバイスへの並列実行をサポートするアニーリング用 Executor。""" # -------------------------------------------------------------- # コンストラクタ / パラメータ # -------------------------------------------------------------- def __init__( self, provider: str = "default", provider_config: Optional[Dict[str, Any]] = None, provider_map: Optional[Dict[str, str]] = None, provider_config_map: Optional[Dict[str, Dict[str, Any]]] = None, ): """Parameters ---------- provider : str, optional The provider to use for the executor. (e.g. "amplify", "dimod") provider_config : dict[str, Any], optional The configuration for the provider. provider_map : dict[str, str], optional The mapping of block labels to providers. (e.g. {"block0": "amplify", "block1": "dimod"}) provider_config_map : dict[str, dict[str, Any]], optional The mapping of block labels to provider configurations. (e.g. {"block0": {"backend": "amplify"}, "block1": {"backend": "dimod"}}) """ super().__init__(provider, provider_config, provider_map, provider_config_map) # -------------------------------------------------------------- # パブリック API # --------------------------------------------------------------
[docs] def run( self, input_data: QdAnnealingInput ) -> QdAnnealingOutput: # noqa: D401 – simple method name """単一の :class:`QdAnnealingInput` を実行し、 ``QdAnnealingOutput`` を返却。 Parameters ---------- input_data : QdAnnealingInput 実行対象の量子アニーリングブロックを含む入力。 Returns ------- QdAnnealingOutput ブロック名をキー、各 backend の実行結果を値とする辞書を ``results`` として保持します。 """ block = input_data.block provider = self.resolve_provider(block.label) config = self.resolve_provider_config(block.label) _, raw = self._run_single_block(block, provider, config) out = QdAnnealingOutput.from_sdk_format(raw, target=provider) if block.label != "block0" and "block0" in out.results: out.results[block.label] = out.results.pop("block0") return out
[docs] def run_split( self, input_data: QdAnnealingInput ) -> QdAnnealingOutput: # noqa: D401 – simple method name """与えられた複数ブロックを並列実行し、結果を ``QdAnnealingOutput`` で返却。 Parameters ---------- input_data : QdAnnealingInput 実行対象の量子アニーリングブロックを含む入力。 Returns ------- QdAnnealingOutput ブロック名をキー、各 backend の実行結果を値とする辞書を ``results`` として保持します。 """ if not hasattr(input_data, "blocks"): raise AttributeError("input_data は 'blocks' 属性を持つ必要があります。") results: Dict[str, Dict[str, Any]] = {} # 並列実行 (CPU バウンドではないため ThreadPoolExecutor で十分) with ThreadPoolExecutor() as pool: future_map = { pool.submit( self._run_single_block, block, self.resolve_provider(block.label), self.resolve_provider_config(block.label), ): block.label for block in input_data.blocks } for future in as_completed(future_map): label = future_map[future] _, raw = future.result() provider = self.resolve_provider(label) out = QdAnnealingOutput.from_sdk_format(raw, target=provider) # label に揃える if label != "block0" and "block0" in out.results: out.results[label] = out.results.pop("block0") results.update(out.results) return QdAnnealingOutput(results)
# -------------------------------------------------------------- # 内部ユーティリティ # -------------------------------------------------------------- def _run_single_block(self, block, provider: str, kwargs: Dict[str, Any]): """1 ブロック分の QUBO を指定バックエンドで解く。""" if provider == "amplify": raw = self._run_amplify(block.qubo, **kwargs) elif provider in ("dimod", "default"): raw = self._run_dimod(block.qubo, **kwargs) else: raise NotImplementedError(f"Provider '{provider}' は未サポートです。") return block.label, raw # ------------------------------------------------------------------ # backend 実装 # ------------------------------------------------------------------ @staticmethod def _run_amplify(qubo: Dict[Tuple[str, str], float], **kwargs): """Fixstars Amplify を用いて QUBO を解く。(トークン未設定時はフォールバック)""" try: from amplify import VariableGenerator, Model, FixstarsClient, solve # type: ignore # QUBO dict -> Amplify Poly へ変換 gen = VariableGenerator() variables = {} for key in qubo.keys(): for var_name in key: if var_name not in variables: variables[var_name] = gen.scalar("Binary", name=str(var_name)) poly = 0 for key, coeff in qubo.items(): term = 1 for var_name in key: term *= variables[var_name] poly += coeff * term model = Model(poly) client = FixstarsClient() token = os.getenv("AMPLIFY_TOKEN") if token: client.token = token return solve(model, client) except Exception: # noqa: BLE001 – Any failure → フォールバック # Amplify が使えない場合は naive 解法にフォールバック return QdAnnealingExecutor._run_naive_with_stats(qubo, device="amplify(fallback)") @staticmethod def _run_dimod(qubo: Dict[Tuple[str, str], float], **kwargs): """Dimod の ExactSolver で QUBO を解く。dimod が無い場合はフォールバック。""" try: import dimod # type: ignore linear = {} quadratic = {} for (i, j), coeff in qubo.items(): if i == j: linear[i] = coeff else: quadratic[(i, j)] = coeff bqm = dimod.BinaryQuadraticModel(linear, quadratic, 0.0, vartype="BINARY") sampler = dimod.ExactSolver() return sampler.sample(bqm) except Exception: # noqa: BLE001 – ImportError or others return QdAnnealingExecutor._run_naive_with_stats(qubo, device="dimod(fallback)") # ------------------------------------------------------------------ # フォールバック: 単純評価 (すべて 0 に固定) # ------------------------------------------------------------------ @staticmethod def _run_naive_with_stats(qubo: Dict[Tuple[str, str], float], device="naive", energies: Optional[List[float]] = None, **kwargs): vars_set = set() for key in qubo.keys(): vars_set.update(key) solution = {v: 0 for v in vars_set} energy = 0.0 energies_list = energies if energies is not None else [energy] stats = { "energy": energy_statistics(energies_list), "bitstring": {"unique": 1}, } return {"solution": solution, "energy": energy, "energies": energies_list, "statistics": stats, "device": device}
# エイリアス QdAnnExec = QdAnnealingExecutor