Coverage for physioblocks / description / nets.py: 99%
209 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-09 16:40 +0100
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-09 16:40 +0100
1# SPDX-FileCopyrightText: Copyright INRIA
2#
3# SPDX-License-Identifier: LGPL-3.0-only
4#
5# Copyright INRIA
6#
7# This file is part of PhysioBlocks, a library mostly developed by the
8# [Ananke project-team](https://team.inria.fr/ananke) at INRIA.
9#
10# Authors:
11# - Colin Drieu
12# - Dominique Chapelle
13# - François Kimmig
14# - Philippe Moireau
15#
16# PhysioBlocks is free software: you can redistribute it and/or modify it under the
17# terms of the GNU Lesser General Public License as published by the Free Software
18# Foundation, version 3 of the License.
19#
20# PhysioBlocks is distributed in the hope that it will be useful, but WITHOUT ANY
21# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
22# PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
23#
24# You should have received a copy of the GNU Lesser General Public License along with
25# PhysioBlocks. If not, see <https://www.gnu.org/licenses/>.
27"""
28Define the :class:`~.Net` object that organises
29:class:`~physioblocks.description.blocks.BlockDescription` and
30:class:`~.Node` to describe the global system.
31"""
33from __future__ import annotations
35from dataclasses import dataclass
36from pprint import pformat
37from typing import Any
39from physioblocks.description.blocks import (
40 ID_SEPARATOR,
41 BlockDescription,
42 ModelComponentDescription,
43)
44from physioblocks.description.flux import Dof, get_flux_dof_register
45from physioblocks.registers.type_register import register_type
47# Get all defined flux types
48_flux_type_register = get_flux_dof_register()
50# Constant for the net type id
51NET_TYPE_ID = "net"
53# Constant for the boundary condition type
54BOUNDARY_CONDITION_ID = "condition"
57@dataclass
58@register_type(BOUNDARY_CONDITION_ID)
59class BoundaryCondition:
60 """
61 Holds boundary condition description.
62 """
64 condition_type: str
65 """The condition type id"""
66 condition_id: str
67 """The name of the parameter"""
70class Node:
71 """
72 **Global Node** in a :class:`~.Net` object.
74 They hold a set of :class:`~physioblocks.description.flux.Dof` and
75 :class:`~.BoundaryCondition`. They define one
76 dof per **Flux Type**.
78 For every block sharing a flux at the node, it holds its name in the net
79 matching the local node index of the flux.
81 Different **Flux types** can be shared at the same node, but they will
82 not mix (.ie **Flux** are only summed with the other **Flux** of the same type).
83 Consequently, each node adds one equation per **Flux Type**
84 to the **Global System**.
86 :param node_id: the name of the node in the net
87 :type node_id: str
88 """
90 _unique_id: str
91 """the node id in the net"""
93 _dofs: list[Dof]
94 """the DOFs of the fluxes at the node"""
96 _local_nodes: list[tuple[str, int]]
97 """the ids of the blocks and their local node present at the global node"""
99 _boundary_conditions: list[BoundaryCondition]
100 """boundary condition types at the node (empty if the node is not a boundary)"""
102 def __init__(self, node_id: str) -> None:
103 self._unique_id = node_id
104 self._dofs = []
105 self._local_nodes = []
106 self._boundary_conditions = []
108 @property
109 def name(self) -> str:
110 """
111 Get the name of the node in the net
113 :return: the node name
114 :rtype: str
115 """
116 return self._unique_id
118 def has_flux_type(self, flux_type: str) -> bool:
119 """
120 Check if the flux type is defined at the node.
122 :return: True if the flux type is accepted, False otherwise
123 :rtype: bool
124 """
125 for dof in self._dofs:
126 matching_flux_type = _flux_type_register.dof_flux_couples[dof.dof_type]
127 if flux_type == matching_flux_type:
128 return True
129 return False
131 def add_dof(self, dof_id: str, dof_type: str) -> None:
132 """
133 Create a :class:`~physioblocks.description.flux.Dof` object of the given type
134 at the node.
136 :param dof_id: the dof id.
137 :type dof_id: str
139 :param dof_type: the DOF type.
140 :type dof_type: str
142 :raise ValueError: raises a Value Error if the Dof type is not registered.
143 """
144 if dof_type not in _flux_type_register.dof_flux_couples:
145 raise ValueError(
146 str.format(
147 "Can not create a Dof with unregister dof type {0}",
148 dof_type,
149 )
150 )
152 dof = Dof(dof_id, dof_type)
153 self._dofs.append(dof)
155 def remove_dof(self, dof_type: str) -> None:
156 """
157 Remove the DOF of the given type.
159 :param dof_type: the DOF type.
160 :type dof_type: str
161 """
162 for dof in self._dofs:
163 if dof.dof_type == dof_type:
164 self._dofs.remove(dof)
165 break
167 @property
168 def dofs(self) -> list[Dof]:
169 """
170 Get all DOFs at the node.
172 :return: all the DOFs at the node.
173 :rtype: list[Dof]
174 """
175 return self._dofs.copy()
177 def get_flux_dof(self, flux_type: str) -> Dof:
178 """
179 Get the DOF matching the flux type.
181 :param flux_type: the flux type.
182 :type flux_type: str
184 :return: the DOF
185 :rtype: Dof
186 """
187 for dof in self._dofs:
188 matching_flux_type = _flux_type_register.dof_flux_couples[dof.dof_type]
189 if flux_type == matching_flux_type:
190 return dof
192 raise KeyError(str.format("No dof matching the flux type {0}", flux_type))
194 def get_dof(self, dof_id: str) -> Dof:
195 """
196 Get a DOF with the given id.
198 :param dof_id: id of the DOF to get
199 :type dof_id: str
201 :raise KeyError: Error raised when there are no DOFs with the given id at
202 the node.
204 :return: the DOF if it exists
205 :rtype: Dof
206 """
207 for dof in self._dofs:
208 if dof.dof_id == dof_id:
209 return dof
210 raise KeyError(str.format("{0} is not defined at node {1}.", dof_id, self.name))
212 @property
213 def is_boundary(self) -> bool:
214 """
215 Check if the node is defines a boundary of the net.
217 :return: True if the node is a boundary, False otherwise
218 :rtype: bool
219 """
220 return len(self._boundary_conditions) > 0
222 @property
223 def boundary_conditions(self) -> list[BoundaryCondition]:
224 """
225 Get the boundary conditions at the node.
227 This is a list of string representing the boundary condition
228 types add the node.
230 :return: a list the boundaries conditions
231 :rtype: list[str]
232 """
233 return self._boundary_conditions.copy()
235 def add_boundary_condition(
236 self, condition_type: str, parameter_id: str
237 ) -> BoundaryCondition:
238 """
239 Add a boundary condition at the node.
241 A DOF or flux type matching the condition type should exist.
243 :param condition_type: the flux or potential type
244 :type condition_type: str
246 :param parameter_id: the condition parameter global name
247 :type parameter: str
249 :raise ValueError: Raises a ValueError when no DOF or flux type is
250 matching the condition type or when a matching type already has a boundary
251 condition
252 """
253 matching_dof = [
254 dof
255 for dof in self._dofs
256 if condition_type
257 in [
258 dof.dof_type,
259 _flux_type_register.dof_flux_couples[dof.dof_type],
260 ]
261 ]
262 if len(matching_dof) != 1:
263 raise ValueError(
264 str.format(
265 "There are no potential or flux matching type {0} at node {1}.",
266 condition_type,
267 self.name,
268 )
269 )
270 dof = matching_dof[0]
271 is_flux_condition = (
272 condition_type == _flux_type_register.dof_flux_couples[dof.dof_type]
273 )
275 self._check_existing_condition(condition_type, is_flux_condition)
277 bc = BoundaryCondition(condition_type, parameter_id)
278 self._boundary_conditions.append(bc)
280 # In the case of a potential: rename the dof
281 if is_flux_condition is False:
282 dof.dof_id = parameter_id
284 return bc
286 def _check_existing_condition(
287 self, condition_type: str, is_flux_condition: bool
288 ) -> None:
289 # Test that the condition type or matching type is not already added at the node
290 check_existing_condition = [
291 condition.condition_type
292 in [
293 condition_type,
294 _flux_type_register.flux_dof_couples[condition_type],
295 ]
296 if is_flux_condition
297 else condition.condition_type
298 in [
299 condition_type,
300 _flux_type_register.dof_flux_couples[condition_type],
301 ]
302 for condition in self._boundary_conditions
303 ]
305 if any(check_existing_condition):
306 raise ValueError(
307 str.format(
308 "A boundary condition on {0} is already added at node {1}.",
309 condition_type,
310 self.name,
311 )
312 )
314 def remove_boundary_condition(self, condition_type: str) -> None:
315 """
316 Remove a boundary condition of the matching type.
318 :param condition_type: the flux or potential type
319 :type condition_type: str
320 """
321 found = False
322 for boundary in self._boundary_conditions:
323 if boundary.condition_type == condition_type:
324 found = True
325 break
326 if found is True:
327 self._boundary_conditions.remove(boundary)
329 @property
330 def local_nodes(self) -> list[tuple[str, int]]:
331 """
332 Get all the local nodes.
334 :return: The list of local nodes
335 :rtype: list[tuple[str, int]]
336 """
337 return self._local_nodes.copy()
339 def add_node_local(self, block_id: str, block_node_index: int) -> None:
340 """
341 Add a block local node to the global node.
343 :param block_id: the block id in the net.
344 :type block_id: int
346 :param block_node_index: the local node index in the block
347 :type block_node_index: int
348 """
349 self._local_nodes.append((block_id, block_node_index))
351 def remove_node_local(self, block_id: str, block_node_index: int) -> None:
352 """
353 Remove a block local node from the global node.
355 :param block_id: the block id in the net.
356 :type block_id: str
358 :param block_node_index: the local node index in the block
359 :type block_node_index: int
360 """
361 self._local_nodes.remove((block_id, block_node_index))
363 def has_node_local(self, block_id: str, block_node_index: int) -> bool:
364 """
365 Check if a local node is linked to this global node.
367 :param block_id: the block id in the net.
368 :type block_id: str
370 :param block_node_index: the local node index in the block
371 :type block_node_index: int
373 :return: True if the block local node is linked to this node, False otherwise
374 :rtype: bool
375 """
376 return (block_id, block_node_index) in self._local_nodes
379@register_type(NET_TYPE_ID)
380class Net:
381 """
382 The **Net** stores the **Blocks** and linked them with **nodes**.
384 It allows to create the global system.
386 * Internal Equations of the blocks and their submodels are concatenated to the
387 residual.
388 * The fluxes shared at each node are summed by flux type and concatenated to
389 the global system.
390 """
392 _blocks: dict[str, BlockDescription]
393 """the collection of blocks in the net"""
394 _nodes: dict[str, Node]
395 """the collection of nodes in the net"""
397 def __init__(self) -> None:
398 self._nodes = {}
399 self._blocks = {}
401 @property
402 def blocks(self) -> dict[str, BlockDescription]:
403 """
404 Get the :class:`~physioblocks.descriptions.BlockDescription` objects in the net.
406 :return: the blocks descriptions in the net.
407 :rtype: dict[str, BlockDescription]
408 """
409 return self._blocks.copy()
411 @property
412 def nodes(self) -> dict[str, Node]:
413 """
414 Get the :class:`~.Node` objects in the net.
416 :return: the nodes in the net.
417 :rtype: dict[str, Nodes]
418 """
419 return self._nodes.copy()
421 @property
422 def boundary_conditions(self) -> dict[str, list[BoundaryCondition]]:
423 """
424 Get :class:`~.BoundaryCondition` objects in the net with their matching
425 node name.
427 :return: the net boundaries conditions
428 :rtype: dict[str, list[BoundaryCondition]]
429 """
431 boundaries = {
432 node_id: node.boundary_conditions
433 for node_id, node in self._nodes.items()
434 if node.is_boundary
435 }
437 return boundaries
439 def __str__(self) -> str:
440 net_dict: dict[str, Any] = {}
441 net_dict["Blocks"] = {
442 block_id: block.described_type.__name__
443 for block_id, block in self._blocks.items()
444 }
446 # for each node, the list of block ids at the node and the flux index they share
447 net_dict["Nodes"] = {
448 node_id: {
449 block_id: "flux " + str(flux_index)
450 for block_id, flux_index in node.local_nodes
451 }
452 for node_id, node in self._nodes.items()
453 }
455 # add boundary condition
456 for node_id, node in self._nodes.items():
457 if node.is_boundary is True:
458 node_boundaries = {
459 bc.condition_id: bc.condition_type
460 for bc in node.boundary_conditions
461 }
462 net_dict["Nodes"][node_id]["Boundary Conditions"] = node_boundaries
464 return pformat(net_dict, indent=2, compact=False)
466 def add_node(self, node_id: str) -> Node:
467 """
468 Add a new node to the net.
470 :param node_id: The node id.
471 :type node_id: str
473 :raise ValueError: Raise a ValueError if a node with the same id is already in
474 the net.
476 :return: the added node
477 :rtype: Node
478 """
479 if node_id in self._nodes:
480 raise ValueError(
481 str.format(
482 "There is already a node with id {0} in the net",
483 node_id,
484 )
485 )
487 node = Node(node_id)
488 self._nodes[node_id] = node
489 return node
491 def remove_node(self, node_id: str) -> None:
492 """
493 Remove a node from the net.
495 .. note::
497 It also removes all the blocks linked to the node because their dofs no
498 longer exists.
500 :param node_id: The node id.
501 :type node_id: str
502 """
503 # Remove all block at the node
504 node = self._nodes[node_id]
506 for loc_node in node.local_nodes:
507 self.remove_block(loc_node[0])
509 # Actualy remove the node
510 self._nodes.pop(node_id)
512 def add_block(
513 self,
514 block_local_id: str,
515 block_description: BlockDescription,
516 node_ids: dict[int, str],
517 ) -> BlockDescription:
518 """
519 Add a block description in the net.
521 The method returns a copy of the block updated with correct global and dofs ids
522 in the net.
524 :param block_description: the block to add
525 :type block_description: BlockDescription
527 :param node_ids: a mapping of local node indexes in the block to global nodes
528 names in the net.
529 :type node_ids: dict[int, str]
531 :raise ValueError: Exception raised if the block is already in the net
532 or if there is already a block with the given id.
534 :return: the added block description
535 :rtype: BlockDescription
537 .. note::
539 When adding a block to a net, every of its flux should be linked to a node.
540 """
542 if block_local_id in self._blocks:
543 raise ValueError(
544 str.format(
545 "Block with id {0} is already defined in the net.",
546 block_local_id,
547 )
548 )
550 if len(block_description.described_type.nodes) != len(node_ids):
551 raise ValueError(
552 str.format(
553 "Linked node ids list and {0} local nodes list size mismatch.",
554 block_local_id,
555 )
556 )
558 # link block local node to global node.
559 dof_ids = {}
560 for (
561 node_index,
562 flux_def,
563 ) in block_description.described_type.fluxes_expressions.items():
564 global_node_id = node_ids[node_index]
565 global_node = self._nodes[global_node_id]
567 # create new dof if necessary
568 created_dof_types = [dof.dof_type for dof in global_node.dofs]
569 dof_type = _flux_type_register.flux_dof_couples[block_description.flux_type]
570 new_dof_id = ID_SEPARATOR.join([global_node_id, dof_type])
571 # get matching local parameter in the block
572 local_dof_id = flux_def.get_term(0).term_id
573 if local_dof_id in block_description.described_type.local_ids:
574 dof_ids[local_dof_id] = new_dof_id
575 # else the dof is not used in the model
576 # (it is only described to use in submodels)
577 if dof_type not in created_dof_types:
578 global_node.add_dof(new_dof_id, dof_type)
580 # Add node local to node global
581 global_node.add_node_local(block_description.name, node_index)
583 # update global ids for the block with dof ids
584 new_global_ids = block_description.global_ids
585 new_global_ids.update(dof_ids)
587 # create and save the block description
588 block_description = BlockDescription(
589 block_local_id,
590 block_description.described_type,
591 block_description.flux_type,
592 new_global_ids,
593 block_description.submodels,
594 )
596 self._blocks[block_local_id] = block_description
598 return self._blocks[block_local_id]
600 def remove_block(self, block_id: str) -> None:
601 """
602 Remove a block from the net.
604 Also removes the dofs at the global nodes that no longer exists when this
605 block is deleted (the dofs that are not linked to any block when the block
606 is removed).
608 :param block_id: the block id to remove
609 :type block_id: str
610 """
611 # Remove the block
612 self._blocks.pop(block_id)
614 # remove the block from nodes local indexes
615 for node in self._nodes.values():
616 to_remove = []
617 for node_block_id, node_local_index in node.local_nodes:
618 if node_block_id == block_id:
619 to_remove.append((node_block_id, node_local_index))
620 for node_block_id, node_local_index in to_remove:
621 node.remove_node_local(node_block_id, node_local_index)
623 # If no blocks still links to the block global nodes,
624 # remove the dof at this nodes.
625 self.__clean_unlinked_dofs()
627 def __clean_unlinked_dofs(self) -> None:
628 for node in self._nodes.values():
629 dof_types_to_remove = []
630 for dof in node.dofs:
631 dof_flux_type = _flux_type_register.dof_flux_couples[dof.dof_type]
632 blocks_at_node = [local_node[0] for local_node in node.local_nodes]
633 if (
634 any(
635 [
636 self._blocks[block_id].flux_type == dof_flux_type
637 for block_id in blocks_at_node
638 ]
639 )
640 is False
641 ):
642 dof_types_to_remove.append(dof.dof_type)
644 for dof_type in dof_types_to_remove:
645 node.remove_dof(dof_type)
647 def local_to_global_node_id(self, block_id: str, index_block_node: int) -> str:
648 """
649 Get the id of the global node linked to the given local node.
651 :param block_id: the id of the block in the net
652 :type block_id: str
654 :param index_block_node: the index of the local node in the block
655 :type index_block_node: int
657 :raise ValueError:
658 Raise a ValueError if no globla node is linked to the given local node.
660 :return: the global node name
661 :rtype: str
662 """
663 for node_id, node in self._nodes.items():
664 if node.has_node_local(block_id, index_block_node):
665 return node_id
666 raise ValueError(
667 str.format(
668 "No Global Node is linked to the given local node ({0}:{1})",
669 block_id,
670 index_block_node,
671 )
672 )
674 def set_boundary(
675 self, node_id: str, condition_type: str, parameter_id: str
676 ) -> None:
677 """
678 Set a :class:`~.BoundaryCondition` object in the net.
680 :param node_id: the index of the node where to set the boundary condition
681 :type node_id: str
683 :param condition_type: the flux or dof type of the condition.
684 :type condition_type: str
686 :param parameter_id: the condition matching parameter id
687 :type parameter: str
688 """
689 node = self._nodes[node_id]
690 matching_dof = [
691 dof
692 for dof in node.dofs
693 if condition_type
694 in [
695 dof.dof_type,
696 _flux_type_register.dof_flux_couples[dof.dof_type],
697 ]
698 ]
699 if len(matching_dof) == 0:
700 raise ValueError(
701 str.format(
702 "There is no dof matching condition_type {0} at node {1}",
703 condition_type,
704 node_id,
705 )
706 )
707 elif len(matching_dof) > 1:
708 raise ValueError(
709 str.format(
710 "There are multiple dof matching condition_type {0} at node {1}",
711 condition_type,
712 node_id,
713 )
714 )
716 old_dof_id = matching_dof[0].dof_id
717 bc = node.add_boundary_condition(condition_type, parameter_id)
719 if (
720 bc.condition_id != old_dof_id
721 and bc.condition_type in _flux_type_register.dof_flux_couples
722 ):
723 # rename potentiel with the new id in all blocks and models
724 for block in self._blocks.values():
725 self._rename_block_ids_rec(block, old_dof_id, bc.condition_id)
727 def _rename_block_ids_rec(
728 self, model: ModelComponentDescription, old: str, new: str
729 ) -> None:
730 model.rename_global_id(old, new)
732 for submodel in model.submodels.values():
733 self._rename_block_ids_rec(submodel, old, new)
735 def remove_boundary(self, node_id: str, condition_type: str) -> None:
736 """
737 Remove a boundary condition from the net.
739 :param node_id: the node name where the condition is
740 :type node_id: str
742 :param condition_type: the flux or dof type for the condition
743 :type condition_type: str
744 """
745 node = self._nodes[node_id]
746 node.remove_boundary_condition(condition_type)