Coverage for physioblocks / description / nets.py: 99%

209 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-09 16:40 +0100

1# SPDX-FileCopyrightText: Copyright INRIA 

2# 

3# SPDX-License-Identifier: LGPL-3.0-only 

4# 

5# Copyright INRIA 

6# 

7# This file is part of PhysioBlocks, a library mostly developed by the 

8# [Ananke project-team](https://team.inria.fr/ananke) at INRIA. 

9# 

10# Authors: 

11# - Colin Drieu 

12# - Dominique Chapelle 

13# - François Kimmig 

14# - Philippe Moireau 

15# 

16# PhysioBlocks is free software: you can redistribute it and/or modify it under the 

17# terms of the GNU Lesser General Public License as published by the Free Software 

18# Foundation, version 3 of the License. 

19# 

20# PhysioBlocks is distributed in the hope that it will be useful, but WITHOUT ANY 

21# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 

22# PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. 

23# 

24# You should have received a copy of the GNU Lesser General Public License along with 

25# PhysioBlocks. If not, see <https://www.gnu.org/licenses/>. 

26 

27""" 

28Define the :class:`~.Net` object that organises 

29:class:`~physioblocks.description.blocks.BlockDescription` and 

30:class:`~.Node` to describe the global system. 

31""" 

32 

33from __future__ import annotations 

34 

35from dataclasses import dataclass 

36from pprint import pformat 

37from typing import Any 

38 

39from physioblocks.description.blocks import ( 

40 ID_SEPARATOR, 

41 BlockDescription, 

42 ModelComponentDescription, 

43) 

44from physioblocks.description.flux import Dof, get_flux_dof_register 

45from physioblocks.registers.type_register import register_type 

46 

47# Get all defined flux types 

48_flux_type_register = get_flux_dof_register() 

49 

50# Constant for the net type id 

51NET_TYPE_ID = "net" 

52 

53# Constant for the boundary condition type 

54BOUNDARY_CONDITION_ID = "condition" 

55 

56 

57@dataclass 

58@register_type(BOUNDARY_CONDITION_ID) 

59class BoundaryCondition: 

60 """ 

61 Holds boundary condition description. 

62 """ 

63 

64 condition_type: str 

65 """The condition type id""" 

66 condition_id: str 

67 """The name of the parameter""" 

68 

69 

70class Node: 

71 """ 

72 **Global Node** in a :class:`~.Net` object. 

73 

74 They hold a set of :class:`~physioblocks.description.flux.Dof` and 

75 :class:`~.BoundaryCondition`. They define one 

76 dof per **Flux Type**. 

77 

78 For every block sharing a flux at the node, it holds its name in the net 

79 matching the local node index of the flux. 

80 

81 Different **Flux types** can be shared at the same node, but they will 

82 not mix (.ie **Flux** are only summed with the other **Flux** of the same type). 

83 Consequently, each node adds one equation per **Flux Type** 

84 to the **Global System**. 

85 

86 :param node_id: the name of the node in the net 

87 :type node_id: str 

88 """ 

89 

90 _unique_id: str 

91 """the node id in the net""" 

92 

93 _dofs: list[Dof] 

94 """the DOFs of the fluxes at the node""" 

95 

96 _local_nodes: list[tuple[str, int]] 

97 """the ids of the blocks and their local node present at the global node""" 

98 

99 _boundary_conditions: list[BoundaryCondition] 

100 """boundary condition types at the node (empty if the node is not a boundary)""" 

101 

102 def __init__(self, node_id: str) -> None: 

103 self._unique_id = node_id 

104 self._dofs = [] 

105 self._local_nodes = [] 

106 self._boundary_conditions = [] 

107 

108 @property 

109 def name(self) -> str: 

110 """ 

111 Get the name of the node in the net 

112 

113 :return: the node name 

114 :rtype: str 

115 """ 

116 return self._unique_id 

117 

118 def has_flux_type(self, flux_type: str) -> bool: 

119 """ 

120 Check if the flux type is defined at the node. 

121 

122 :return: True if the flux type is accepted, False otherwise 

123 :rtype: bool 

124 """ 

125 for dof in self._dofs: 

126 matching_flux_type = _flux_type_register.dof_flux_couples[dof.dof_type] 

127 if flux_type == matching_flux_type: 

128 return True 

129 return False 

130 

131 def add_dof(self, dof_id: str, dof_type: str) -> None: 

132 """ 

133 Create a :class:`~physioblocks.description.flux.Dof` object of the given type 

134 at the node. 

135 

136 :param dof_id: the dof id. 

137 :type dof_id: str 

138 

139 :param dof_type: the DOF type. 

140 :type dof_type: str 

141 

142 :raise ValueError: raises a Value Error if the Dof type is not registered. 

143 """ 

144 if dof_type not in _flux_type_register.dof_flux_couples: 

145 raise ValueError( 

146 str.format( 

147 "Can not create a Dof with unregister dof type {0}", 

148 dof_type, 

149 ) 

150 ) 

151 

152 dof = Dof(dof_id, dof_type) 

153 self._dofs.append(dof) 

154 

155 def remove_dof(self, dof_type: str) -> None: 

156 """ 

157 Remove the DOF of the given type. 

158 

159 :param dof_type: the DOF type. 

160 :type dof_type: str 

161 """ 

162 for dof in self._dofs: 

163 if dof.dof_type == dof_type: 

164 self._dofs.remove(dof) 

165 break 

166 

167 @property 

168 def dofs(self) -> list[Dof]: 

169 """ 

170 Get all DOFs at the node. 

171 

172 :return: all the DOFs at the node. 

173 :rtype: list[Dof] 

174 """ 

175 return self._dofs.copy() 

176 

177 def get_flux_dof(self, flux_type: str) -> Dof: 

178 """ 

179 Get the DOF matching the flux type. 

180 

181 :param flux_type: the flux type. 

182 :type flux_type: str 

183 

184 :return: the DOF 

185 :rtype: Dof 

186 """ 

187 for dof in self._dofs: 

188 matching_flux_type = _flux_type_register.dof_flux_couples[dof.dof_type] 

189 if flux_type == matching_flux_type: 

190 return dof 

191 

192 raise KeyError(str.format("No dof matching the flux type {0}", flux_type)) 

193 

194 def get_dof(self, dof_id: str) -> Dof: 

195 """ 

196 Get a DOF with the given id. 

197 

198 :param dof_id: id of the DOF to get 

199 :type dof_id: str 

200 

201 :raise KeyError: Error raised when there are no DOFs with the given id at 

202 the node. 

203 

204 :return: the DOF if it exists 

205 :rtype: Dof 

206 """ 

207 for dof in self._dofs: 

208 if dof.dof_id == dof_id: 

209 return dof 

210 raise KeyError(str.format("{0} is not defined at node {1}.", dof_id, self.name)) 

211 

212 @property 

213 def is_boundary(self) -> bool: 

214 """ 

215 Check if the node is defines a boundary of the net. 

216 

217 :return: True if the node is a boundary, False otherwise 

218 :rtype: bool 

219 """ 

220 return len(self._boundary_conditions) > 0 

221 

222 @property 

223 def boundary_conditions(self) -> list[BoundaryCondition]: 

224 """ 

225 Get the boundary conditions at the node. 

226 

227 This is a list of string representing the boundary condition 

228 types add the node. 

229 

230 :return: a list the boundaries conditions 

231 :rtype: list[str] 

232 """ 

233 return self._boundary_conditions.copy() 

234 

235 def add_boundary_condition( 

236 self, condition_type: str, parameter_id: str 

237 ) -> BoundaryCondition: 

238 """ 

239 Add a boundary condition at the node. 

240 

241 A DOF or flux type matching the condition type should exist. 

242 

243 :param condition_type: the flux or potential type 

244 :type condition_type: str 

245 

246 :param parameter_id: the condition parameter global name 

247 :type parameter: str 

248 

249 :raise ValueError: Raises a ValueError when no DOF or flux type is 

250 matching the condition type or when a matching type already has a boundary 

251 condition 

252 """ 

253 matching_dof = [ 

254 dof 

255 for dof in self._dofs 

256 if condition_type 

257 in [ 

258 dof.dof_type, 

259 _flux_type_register.dof_flux_couples[dof.dof_type], 

260 ] 

261 ] 

262 if len(matching_dof) != 1: 

263 raise ValueError( 

264 str.format( 

265 "There are no potential or flux matching type {0} at node {1}.", 

266 condition_type, 

267 self.name, 

268 ) 

269 ) 

270 dof = matching_dof[0] 

271 is_flux_condition = ( 

272 condition_type == _flux_type_register.dof_flux_couples[dof.dof_type] 

273 ) 

274 

275 self._check_existing_condition(condition_type, is_flux_condition) 

276 

277 bc = BoundaryCondition(condition_type, parameter_id) 

278 self._boundary_conditions.append(bc) 

279 

280 # In the case of a potential: rename the dof 

281 if is_flux_condition is False: 

282 dof.dof_id = parameter_id 

283 

284 return bc 

285 

286 def _check_existing_condition( 

287 self, condition_type: str, is_flux_condition: bool 

288 ) -> None: 

289 # Test that the condition type or matching type is not already added at the node 

290 check_existing_condition = [ 

291 condition.condition_type 

292 in [ 

293 condition_type, 

294 _flux_type_register.flux_dof_couples[condition_type], 

295 ] 

296 if is_flux_condition 

297 else condition.condition_type 

298 in [ 

299 condition_type, 

300 _flux_type_register.dof_flux_couples[condition_type], 

301 ] 

302 for condition in self._boundary_conditions 

303 ] 

304 

305 if any(check_existing_condition): 

306 raise ValueError( 

307 str.format( 

308 "A boundary condition on {0} is already added at node {1}.", 

309 condition_type, 

310 self.name, 

311 ) 

312 ) 

313 

314 def remove_boundary_condition(self, condition_type: str) -> None: 

315 """ 

316 Remove a boundary condition of the matching type. 

317 

318 :param condition_type: the flux or potential type 

319 :type condition_type: str 

320 """ 

321 found = False 

322 for boundary in self._boundary_conditions: 

323 if boundary.condition_type == condition_type: 

324 found = True 

325 break 

326 if found is True: 

327 self._boundary_conditions.remove(boundary) 

328 

329 @property 

330 def local_nodes(self) -> list[tuple[str, int]]: 

331 """ 

332 Get all the local nodes. 

333 

334 :return: The list of local nodes 

335 :rtype: list[tuple[str, int]] 

336 """ 

337 return self._local_nodes.copy() 

338 

339 def add_node_local(self, block_id: str, block_node_index: int) -> None: 

340 """ 

341 Add a block local node to the global node. 

342 

343 :param block_id: the block id in the net. 

344 :type block_id: int 

345 

346 :param block_node_index: the local node index in the block 

347 :type block_node_index: int 

348 """ 

349 self._local_nodes.append((block_id, block_node_index)) 

350 

351 def remove_node_local(self, block_id: str, block_node_index: int) -> None: 

352 """ 

353 Remove a block local node from the global node. 

354 

355 :param block_id: the block id in the net. 

356 :type block_id: str 

357 

358 :param block_node_index: the local node index in the block 

359 :type block_node_index: int 

360 """ 

361 self._local_nodes.remove((block_id, block_node_index)) 

362 

363 def has_node_local(self, block_id: str, block_node_index: int) -> bool: 

364 """ 

365 Check if a local node is linked to this global node. 

366 

367 :param block_id: the block id in the net. 

368 :type block_id: str 

369 

370 :param block_node_index: the local node index in the block 

371 :type block_node_index: int 

372 

373 :return: True if the block local node is linked to this node, False otherwise 

374 :rtype: bool 

375 """ 

376 return (block_id, block_node_index) in self._local_nodes 

377 

378 

379@register_type(NET_TYPE_ID) 

380class Net: 

381 """ 

382 The **Net** stores the **Blocks** and linked them with **nodes**. 

383 

384 It allows to create the global system. 

385 

386 * Internal Equations of the blocks and their submodels are concatenated to the 

387 residual. 

388 * The fluxes shared at each node are summed by flux type and concatenated to 

389 the global system. 

390 """ 

391 

392 _blocks: dict[str, BlockDescription] 

393 """the collection of blocks in the net""" 

394 _nodes: dict[str, Node] 

395 """the collection of nodes in the net""" 

396 

397 def __init__(self) -> None: 

398 self._nodes = {} 

399 self._blocks = {} 

400 

401 @property 

402 def blocks(self) -> dict[str, BlockDescription]: 

403 """ 

404 Get the :class:`~physioblocks.descriptions.BlockDescription` objects in the net. 

405 

406 :return: the blocks descriptions in the net. 

407 :rtype: dict[str, BlockDescription] 

408 """ 

409 return self._blocks.copy() 

410 

411 @property 

412 def nodes(self) -> dict[str, Node]: 

413 """ 

414 Get the :class:`~.Node` objects in the net. 

415 

416 :return: the nodes in the net. 

417 :rtype: dict[str, Nodes] 

418 """ 

419 return self._nodes.copy() 

420 

421 @property 

422 def boundary_conditions(self) -> dict[str, list[BoundaryCondition]]: 

423 """ 

424 Get :class:`~.BoundaryCondition` objects in the net with their matching 

425 node name. 

426 

427 :return: the net boundaries conditions 

428 :rtype: dict[str, list[BoundaryCondition]] 

429 """ 

430 

431 boundaries = { 

432 node_id: node.boundary_conditions 

433 for node_id, node in self._nodes.items() 

434 if node.is_boundary 

435 } 

436 

437 return boundaries 

438 

439 def __str__(self) -> str: 

440 net_dict: dict[str, Any] = {} 

441 net_dict["Blocks"] = { 

442 block_id: block.described_type.__name__ 

443 for block_id, block in self._blocks.items() 

444 } 

445 

446 # for each node, the list of block ids at the node and the flux index they share 

447 net_dict["Nodes"] = { 

448 node_id: { 

449 block_id: "flux " + str(flux_index) 

450 for block_id, flux_index in node.local_nodes 

451 } 

452 for node_id, node in self._nodes.items() 

453 } 

454 

455 # add boundary condition 

456 for node_id, node in self._nodes.items(): 

457 if node.is_boundary is True: 

458 node_boundaries = { 

459 bc.condition_id: bc.condition_type 

460 for bc in node.boundary_conditions 

461 } 

462 net_dict["Nodes"][node_id]["Boundary Conditions"] = node_boundaries 

463 

464 return pformat(net_dict, indent=2, compact=False) 

465 

466 def add_node(self, node_id: str) -> Node: 

467 """ 

468 Add a new node to the net. 

469 

470 :param node_id: The node id. 

471 :type node_id: str 

472 

473 :raise ValueError: Raise a ValueError if a node with the same id is already in 

474 the net. 

475 

476 :return: the added node 

477 :rtype: Node 

478 """ 

479 if node_id in self._nodes: 

480 raise ValueError( 

481 str.format( 

482 "There is already a node with id {0} in the net", 

483 node_id, 

484 ) 

485 ) 

486 

487 node = Node(node_id) 

488 self._nodes[node_id] = node 

489 return node 

490 

491 def remove_node(self, node_id: str) -> None: 

492 """ 

493 Remove a node from the net. 

494 

495 .. note:: 

496 

497 It also removes all the blocks linked to the node because their dofs no 

498 longer exists. 

499 

500 :param node_id: The node id. 

501 :type node_id: str 

502 """ 

503 # Remove all block at the node 

504 node = self._nodes[node_id] 

505 

506 for loc_node in node.local_nodes: 

507 self.remove_block(loc_node[0]) 

508 

509 # Actualy remove the node 

510 self._nodes.pop(node_id) 

511 

512 def add_block( 

513 self, 

514 block_local_id: str, 

515 block_description: BlockDescription, 

516 node_ids: dict[int, str], 

517 ) -> BlockDescription: 

518 """ 

519 Add a block description in the net. 

520 

521 The method returns a copy of the block updated with correct global and dofs ids 

522 in the net. 

523 

524 :param block_description: the block to add 

525 :type block_description: BlockDescription 

526 

527 :param node_ids: a mapping of local node indexes in the block to global nodes 

528 names in the net. 

529 :type node_ids: dict[int, str] 

530 

531 :raise ValueError: Exception raised if the block is already in the net 

532 or if there is already a block with the given id. 

533 

534 :return: the added block description 

535 :rtype: BlockDescription 

536 

537 .. note:: 

538 

539 When adding a block to a net, every of its flux should be linked to a node. 

540 """ 

541 

542 if block_local_id in self._blocks: 

543 raise ValueError( 

544 str.format( 

545 "Block with id {0} is already defined in the net.", 

546 block_local_id, 

547 ) 

548 ) 

549 

550 if len(block_description.described_type.nodes) != len(node_ids): 

551 raise ValueError( 

552 str.format( 

553 "Linked node ids list and {0} local nodes list size mismatch.", 

554 block_local_id, 

555 ) 

556 ) 

557 

558 # link block local node to global node. 

559 dof_ids = {} 

560 for ( 

561 node_index, 

562 flux_def, 

563 ) in block_description.described_type.fluxes_expressions.items(): 

564 global_node_id = node_ids[node_index] 

565 global_node = self._nodes[global_node_id] 

566 

567 # create new dof if necessary 

568 created_dof_types = [dof.dof_type for dof in global_node.dofs] 

569 dof_type = _flux_type_register.flux_dof_couples[block_description.flux_type] 

570 new_dof_id = ID_SEPARATOR.join([global_node_id, dof_type]) 

571 # get matching local parameter in the block 

572 local_dof_id = flux_def.get_term(0).term_id 

573 if local_dof_id in block_description.described_type.local_ids: 

574 dof_ids[local_dof_id] = new_dof_id 

575 # else the dof is not used in the model 

576 # (it is only described to use in submodels) 

577 if dof_type not in created_dof_types: 

578 global_node.add_dof(new_dof_id, dof_type) 

579 

580 # Add node local to node global 

581 global_node.add_node_local(block_description.name, node_index) 

582 

583 # update global ids for the block with dof ids 

584 new_global_ids = block_description.global_ids 

585 new_global_ids.update(dof_ids) 

586 

587 # create and save the block description 

588 block_description = BlockDescription( 

589 block_local_id, 

590 block_description.described_type, 

591 block_description.flux_type, 

592 new_global_ids, 

593 block_description.submodels, 

594 ) 

595 

596 self._blocks[block_local_id] = block_description 

597 

598 return self._blocks[block_local_id] 

599 

600 def remove_block(self, block_id: str) -> None: 

601 """ 

602 Remove a block from the net. 

603 

604 Also removes the dofs at the global nodes that no longer exists when this 

605 block is deleted (the dofs that are not linked to any block when the block 

606 is removed). 

607 

608 :param block_id: the block id to remove 

609 :type block_id: str 

610 """ 

611 # Remove the block 

612 self._blocks.pop(block_id) 

613 

614 # remove the block from nodes local indexes 

615 for node in self._nodes.values(): 

616 to_remove = [] 

617 for node_block_id, node_local_index in node.local_nodes: 

618 if node_block_id == block_id: 

619 to_remove.append((node_block_id, node_local_index)) 

620 for node_block_id, node_local_index in to_remove: 

621 node.remove_node_local(node_block_id, node_local_index) 

622 

623 # If no blocks still links to the block global nodes, 

624 # remove the dof at this nodes. 

625 self.__clean_unlinked_dofs() 

626 

627 def __clean_unlinked_dofs(self) -> None: 

628 for node in self._nodes.values(): 

629 dof_types_to_remove = [] 

630 for dof in node.dofs: 

631 dof_flux_type = _flux_type_register.dof_flux_couples[dof.dof_type] 

632 blocks_at_node = [local_node[0] for local_node in node.local_nodes] 

633 if ( 

634 any( 

635 [ 

636 self._blocks[block_id].flux_type == dof_flux_type 

637 for block_id in blocks_at_node 

638 ] 

639 ) 

640 is False 

641 ): 

642 dof_types_to_remove.append(dof.dof_type) 

643 

644 for dof_type in dof_types_to_remove: 

645 node.remove_dof(dof_type) 

646 

647 def local_to_global_node_id(self, block_id: str, index_block_node: int) -> str: 

648 """ 

649 Get the id of the global node linked to the given local node. 

650 

651 :param block_id: the id of the block in the net 

652 :type block_id: str 

653 

654 :param index_block_node: the index of the local node in the block 

655 :type index_block_node: int 

656 

657 :raise ValueError: 

658 Raise a ValueError if no globla node is linked to the given local node. 

659 

660 :return: the global node name 

661 :rtype: str 

662 """ 

663 for node_id, node in self._nodes.items(): 

664 if node.has_node_local(block_id, index_block_node): 

665 return node_id 

666 raise ValueError( 

667 str.format( 

668 "No Global Node is linked to the given local node ({0}:{1})", 

669 block_id, 

670 index_block_node, 

671 ) 

672 ) 

673 

674 def set_boundary( 

675 self, node_id: str, condition_type: str, parameter_id: str 

676 ) -> None: 

677 """ 

678 Set a :class:`~.BoundaryCondition` object in the net. 

679 

680 :param node_id: the index of the node where to set the boundary condition 

681 :type node_id: str 

682 

683 :param condition_type: the flux or dof type of the condition. 

684 :type condition_type: str 

685 

686 :param parameter_id: the condition matching parameter id 

687 :type parameter: str 

688 """ 

689 node = self._nodes[node_id] 

690 matching_dof = [ 

691 dof 

692 for dof in node.dofs 

693 if condition_type 

694 in [ 

695 dof.dof_type, 

696 _flux_type_register.dof_flux_couples[dof.dof_type], 

697 ] 

698 ] 

699 if len(matching_dof) == 0: 

700 raise ValueError( 

701 str.format( 

702 "There is no dof matching condition_type {0} at node {1}", 

703 condition_type, 

704 node_id, 

705 ) 

706 ) 

707 elif len(matching_dof) > 1: 

708 raise ValueError( 

709 str.format( 

710 "There are multiple dof matching condition_type {0} at node {1}", 

711 condition_type, 

712 node_id, 

713 ) 

714 ) 

715 

716 old_dof_id = matching_dof[0].dof_id 

717 bc = node.add_boundary_condition(condition_type, parameter_id) 

718 

719 if ( 

720 bc.condition_id != old_dof_id 

721 and bc.condition_type in _flux_type_register.dof_flux_couples 

722 ): 

723 # rename potentiel with the new id in all blocks and models 

724 for block in self._blocks.values(): 

725 self._rename_block_ids_rec(block, old_dof_id, bc.condition_id) 

726 

727 def _rename_block_ids_rec( 

728 self, model: ModelComponentDescription, old: str, new: str 

729 ) -> None: 

730 model.rename_global_id(old, new) 

731 

732 for submodel in model.submodels.values(): 

733 self._rename_block_ids_rec(submodel, old, new) 

734 

735 def remove_boundary(self, node_id: str, condition_type: str) -> None: 

736 """ 

737 Remove a boundary condition from the net. 

738 

739 :param node_id: the node name where the condition is 

740 :type node_id: str 

741 

742 :param condition_type: the flux or dof type for the condition 

743 :type condition_type: str 

744 """ 

745 node = self._nodes[node_id] 

746 node.remove_boundary_condition(condition_type)