Source code for qudas.annealing.ir

from collections.abc import Mapping
from typing import Optional


[docs] class QdAnnealingIR(Mapping): """量子アニーリング用の中間表現 (QUBO) を表すクラス。 旧 `QuDataInput` で担っていた QUBO 変換・演算機能を移植した。 `qubo` は dict で保持し、キーは変数名のタプル、値は係数。 """ # ------------------------------ # コンストラクタ & 演算子オーバーロード # ------------------------------ def __init__(self, qubo: Optional[dict] = None): self.qubo: dict = {} if qubo is None: self.qubo = {} elif isinstance(qubo, dict): self.qubo = qubo else: raise TypeError(f"{type(qubo)}は対応していない型です。") # 辞書の比較でキー順を無視する際に使うユーティリティ @staticmethod def _merge_dict(a: dict, b: dict, op): """2つの QUBO dict を merge し、値は op で結合""" result = a.copy() for k, v in b.items(): if k in result: result[k] = op(result[k], v) else: result[k] = op(0, v) return result def __add__(self, other: "QdAnnealingIR") -> "QdAnnealingIR": return QdAnnealingIR( self._merge_dict(self.qubo, other.qubo, lambda x, y: x + y) ) def __sub__(self, other: "QdAnnealingIR") -> "QdAnnealingIR": return QdAnnealingIR( self._merge_dict(self.qubo, other.qubo, lambda x, y: x - y) ) def __mul__(self, other: "QdAnnealingIR") -> "QdAnnealingIR": qubo = {} for k1, v1 in self.qubo.items(): for k2, v2 in other.qubo.items(): key_set = set(k1 + k2) # 既に存在するkeyがあるか確認 found = False for _k in qubo.keys(): if key_set == set(_k): qubo[_k] += v1 * v2 found = True break if not found: if len(key_set) == 1: var = list(key_set)[0] qubo[(var, var)] = v1 * v2 else: qubo[tuple(key_set)] = v1 * v2 return QdAnnealingIR(qubo) def __pow__(self, other: int) -> "QdAnnealingIR": if isinstance(other, int): result = QdAnnealingIR(self.qubo) for _ in range(1, other): result = result * self return result raise TypeError(f"{type(other)}は対応していない型です。") # ------------------------------ # 変換 (from_*) # ------------------------------
[docs] def from_pulp(self, prob: "LpProblem") -> "QdAnnealingIR": # type: ignore[name-defined] from pulp import LpProblem # local import to avoid heavy dep if未使用 if isinstance(prob, LpProblem): qubo = {} for var in prob.objective.to_dict(): qubo[(var["name"], var["name"])] = var["value"] self.qubo = qubo return self raise TypeError(f"{type(prob)}は対応していない型です。")
[docs] def from_amplify(self, prob: "Poly") -> "QdAnnealingIR": # type: ignore[name-defined] from amplify import Poly # type: ignore if isinstance(prob, Poly): variables = prob.variables qubo: dict = {} for key, value in prob.as_dict().items(): if len(key) == 0: continue # 定数 elif len(key) == 1: qubo[(variables[key[0]].name, variables[key[0]].name)] = value elif len(key) == 2: qubo[(variables[key[0]].name, variables[key[1]].name)] = value else: raise ValueError("3変数以上は対応していません。") self.qubo = qubo return self raise TypeError(f"{type(prob)}は対応していない型です。")
[docs] def from_pyqubo(self, prob: "Base") -> "QdAnnealingIR": # type: ignore[name-defined] from pyqubo import Base # type: ignore if isinstance(prob, Base): qubo = prob.compile().to_qubo() self.qubo = qubo[0] return self raise TypeError(f"{type(prob)}は対応していない型です。")
[docs] def from_array(self, prob: "np.ndarray") -> "QdAnnealingIR": # type: ignore[name-defined] import numpy as np # noqa if isinstance(prob, np.ndarray): qubo: dict = {} for i, ai in enumerate(prob): for j, aij in enumerate(ai): if aij == 0: continue if (f"q_{j}", f"q_{i}") in qubo: qubo[(f"q_{j}", f"q_{i}")] += aij else: qubo[(f"q_{i}", f"q_{j}")] = aij self.qubo = qubo return self raise TypeError(f"{type(prob)}は対応していない型です。")
[docs] def from_csv(self, path: str, encoding: str = "utf-8-sig") -> "QdAnnealingIR": import csv # local import try: with open(path, encoding=encoding, newline="") as f: qubo: dict = {} csvreader = csv.reader(f) for i, ai in enumerate(csvreader): for j, aij in enumerate(ai): if float(aij) == 0: continue if (f"q_{j}", f"q_{i}") in qubo: qubo[(f"q_{j}", f"q_{i}")] += float(aij) else: qubo[(f"q_{i}", f"q_{j}")] = float(aij) self.qubo = qubo return self except Exception as e: raise ValueError("読み取りエラー") from e
[docs] def from_json(self, path: str) -> "QdAnnealingIR": import json # local import try: with open(path) as f: qubo: dict = {} jd = json.load(f) for q in jd["qubo"]: qubo[(q["key"][0], q["key"][1])] = q["value"] self.qubo = qubo return self except Exception as e: raise ValueError("読み取りエラー") from e
[docs] def from_networkx(self, prob: "nx.Graph") -> "QdAnnealingIR": # type: ignore[name-defined] import networkx as nx # noqa if isinstance(prob, nx.Graph): qubo: dict = {} for e in prob.edges(): if (f"q_{e[0]}", f"q_{e[1]}") in qubo: qubo[(f"q_{e[0]}", f"q_{e[1]}")] += 1 else: qubo[(f"q_{e[0]}", f"q_{e[1]}")] = 1 self.qubo = qubo return self raise TypeError(f"{type(prob)}は対応していない型です。")
[docs] def from_pandas(self, prob: "pd.DataFrame") -> "QdAnnealingIR": # type: ignore[name-defined] import pandas as pd # noqa if isinstance(prob, pd.DataFrame): key1_list = prob.columns.tolist() key2_list = prob.index.tolist() qubo: dict = {} for k1 in key1_list: for k2 in key2_list: if prob[k1][k2] == 0: continue if (k1, k2) in qubo: qubo[(k1, k2)] += prob[k1][k2] else: qubo[(k1, k2)] = prob[k1][k2] self.qubo = qubo return self raise TypeError(f"{type(prob)}は対応していない型です。")
[docs] def from_dimod_bqm(self, prob: "dimod.BinaryQuadraticModel") -> "QdAnnealingIR": # type: ignore[name-defined] import dimod # noqa if isinstance(prob, dimod.BinaryQuadraticModel): qubo = dict(prob.quadratic).copy() for k, v in prob.linear.items(): if v == 0: continue qubo[(k, k)] = v self.qubo = qubo return self raise TypeError(f"{type(prob)}は対応していない型です。")
[docs] def from_sympy(self, prob: "sympy.core.expr.Expr") -> "QdAnnealingIR": # type: ignore[name-defined] import sympy # noqa if isinstance(prob, sympy.core.expr.Expr): qubo: dict = {} for term in prob.as_ordered_terms(): v, k = term.as_coeff_mul() if len(k) == 1: variable = term.free_symbols qubo[(str(list(variable)[0]), str(list(variable)[0]))] = v else: k_tuple = tuple([str(_k) for _k in k]) qubo[k_tuple] = v self.qubo = qubo return self raise TypeError(f"{type(prob)}は対応していない型です。")
# ------------------------------ # 変換 (to_*) # ------------------------------
[docs] def to_pulp(self): from pulp import LpVariable, LpProblem, LpMinimize # local import variables = list(set(k for key in self.qubo.keys() for k in key)) q = [ LpVariable(name, lowBound=0, upBound=1, cat='Binary') for name in variables ] qubo_prob = LpProblem('QUBO', LpMinimize) _qubo = 0 for key, value in self.qubo.items(): if key[0] == key[1]: variable_index = variables.index(key[0]) _qubo += q[variable_index] * value else: raise ValueError("pulpは2変数以上に対応していません。") qubo_prob += _qubo return qubo_prob
[docs] def to_amplify(self): from amplify import VariableGenerator # type: ignore variables = sorted(set(k for key in self.qubo.keys() for k in key)) gen = VariableGenerator() labeled_q = { str(name): gen.scalar("Binary", name=str(name)) for name in variables } qubo_poly = 0 for key, value in self.qubo.items(): sub_qubo = 1 for k in key: sub_qubo *= labeled_q[k] qubo_poly += sub_qubo * value return qubo_poly
[docs] def to_pyqubo(self): from pyqubo import Binary # type: ignore variables = list(set(k for key in self.qubo.keys() for k in key)) q = [Binary(str(variable)) for variable in variables] qubo_expr = 0 for key, value in self.qubo.items(): sub_qubo = 1 for k in key: variable_index = variables.index(k) sub_qubo *= q[variable_index] qubo_expr += sub_qubo * value return qubo_expr
[docs] def to_array(self): import numpy as np # noqa variables = sorted(list(set(k for key in self.qubo.keys() for k in key))) qubo_arr = np.zeros((len(variables), len(variables))) for key, value in self.qubo.items(): if len(key) == 2: variable_index_0 = variables.index(key[0]) variable_index_1 = variables.index(key[1]) qubo_arr[variable_index_0, variable_index_1] = value else: raise ValueError("matrixは3変数以上に対応していません。") return qubo_arr
[docs] def to_csv(self, name: str = "qudata") -> None: import csv # noqa qubo_arr = self.to_array() try: with open(f"{name}.csv", 'w') as f: writer = csv.writer(f) writer.writerows(qubo_arr) except Exception as e: raise ValueError("書き出しエラー") from e
[docs] def to_json(self, name: str = "qudata") -> None: import json # noqa qubo_json = [ {"key": list(key), "value": value} for key, value in self.qubo.items() ] try: with open(f"{name}.json", 'w') as f: json.dump({"qubo": qubo_json}, f, indent=4) except Exception as e: raise ValueError("書き出しエラー") from e
[docs] def to_networkx(self): import networkx as nx # noqa variables = list(set(k for key in self.qubo.keys() for k in key)) G = nx.Graph() G.add_nodes_from(variables) for key, value in self.qubo.items(): if len(key) == 2 and key[0] != key[1]: G.add_edge(key[0], key[1], weight=value) return G
[docs] def to_pandas(self): import pandas as pd # noqa variables = sorted(set(k for key in self.qubo.keys() for k in key)) df = pd.DataFrame(0.0, index=variables, columns=variables) for key, value in self.qubo.items(): df.loc[key[0], key[1]] = value return df
[docs] def to_dimod_bqm(self): import dimod # noqa linear = {} quadratic = {} for key, value in self.qubo.items(): if key[0] == key[1]: linear[key[0]] = value else: quadratic[(key[0], key[1])] = value return dimod.BinaryQuadraticModel(linear, quadratic, 0.0, vartype='BINARY')
[docs] def to_sympy(self): import sympy # noqa expr = 0 for (i, j), value in self.qubo.items(): if i == j: # 対角項(一次項として扱う) expr += sympy.Symbol(i) * value else: # 非対角項(二次項) expr += sympy.Symbol(i) * sympy.Symbol(j) * value return expr
# ------------------------------ # ユーティリティ # ------------------------------
[docs] def to_dict(self) -> dict: """簡易ダンプ""" return self.qubo
# alias for old name
[docs] @classmethod def from_dict(cls, data: dict) -> "QdAnnealingIR": return cls(qubo=data)
# 旧 API との互換性: qubo プロパティを prob にも提供 @property def prob(self): return self.qubo # 比較用 def __eq__(self, other: object) -> bool: if not isinstance(other, QdAnnealingIR): return False return self.qubo == other.qubo # Mapping インターフェース def __len__(self): return len(self.qubo) def __iter__(self): return iter(self.qubo) def __getitem__(self, key): return self.qubo[key]
QdAnnIR = QdAnnealingIR