Coverage for src/distopf/cim_converter/processors/bus_processor.py: 83%
105 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-09 17:44 -0700
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-09 17:44 -0700
1from distopf.cim_converter.processors.base_processor import BaseProcessor
2from distopf.cim_converter.utils import PhaseUtils
3import cimgraph.data_profile.cimhub_2023 as cim
6class BusProcessor(BaseProcessor):
7 """Processor for bus/node data."""
9 def process(self, network) -> list[dict]:
10 """Process all bus data."""
11 results = []
12 connectivity_nodes = network.graph.get(cim.ConnectivityNode, {})
14 for node in connectivity_nodes.values():
15 bus_data = self._process_bus(node, network)
16 if bus_data:
17 results.append(bus_data)
19 return results
21 def _process_bus(self, node, network) -> dict:
22 """Process individual bus/connectivity node."""
23 bus_data = {
24 "mrid": node.mRID,
25 "id": None, # To be populated later
26 "name": node.name,
27 "pl_a": 0.0,
28 "ql_a": 0.0,
29 "pl_b": 0.0,
30 "ql_b": 0.0,
31 "pl_c": 0.0,
32 "ql_c": 0.0,
33 "pl_s1": 0.0,
34 "ql_s1": 0.0,
35 "pl_s2": 0.0,
36 "ql_s2": 0.0,
37 "bus_type": "PQ",
38 "v_a": 1.0,
39 "v_b": 1.0,
40 "v_c": 1.0,
41 "v_ln_base": None,
42 "s_base": self.s_base,
43 "v_min": 0.95,
44 "v_max": 1.05,
45 "cvr_p": 0,
46 "cvr_q": 0,
47 "phases": None,
48 "latitude": None,
49 "longitude": None,
50 }
52 bus_data["bus_type"] = self._determine_bus_type(node, network)
53 bus_data["v_ln_base"] = self._get_bus_voltage_base(node)
54 self._process_bus_loads(node, network, bus_data)
55 self._process_bus_location(node, network, bus_data)
56 return bus_data
58 def _process_bus_location(self, node, network, bus_data: dict):
59 """Process location data for bus if available."""
60 location = self._find_node_location(node, network)
61 if location:
62 lat, lon = self._extract_coordinates_from_location(location)
63 bus_data["latitude"] = lat
64 bus_data["longitude"] = lon
65 else:
66 bus_data["latitude"] = None
67 bus_data["longitude"] = None
69 def _find_node_location(self, node, network):
70 """Find location associated with this connectivity node through connected equipment."""
71 # Check equipment types that might be connected to this node
72 equipment_types = [
73 cim.EnergyConsumer,
74 cim.PowerElectronicsConnection,
75 cim.LinearShuntCompensator,
76 cim.ACLineSegment,
77 cim.PowerTransformer,
78 cim.Switch,
79 cim.EnergySource,
80 ]
82 for equipment_type in equipment_types:
83 if equipment_type in network.graph:
84 for equipment in network.graph[equipment_type].values():
85 if hasattr(equipment, "Terminals") and equipment.Terminals:
86 for terminal in equipment.Terminals:
87 if (
88 hasattr(terminal, "ConnectivityNode")
89 and terminal.ConnectivityNode.mRID == node.mRID
90 ):
91 # Found equipment connected to this node
92 if (
93 hasattr(equipment, "Location")
94 and equipment.Location
95 ):
96 return equipment.Location
98 # Also check if any Location directly references this node
99 if cim.Location in network.graph:
100 for location in network.graph[cim.Location].values():
101 if (
102 hasattr(location, "PowerSystemResources")
103 and location.PowerSystemResources
104 ):
105 for psr in location.PowerSystemResources:
106 if hasattr(psr, "Terminals") and psr.Terminals:
107 for terminal in psr.Terminals:
108 if (
109 hasattr(terminal, "ConnectivityNode")
110 and terminal.ConnectivityNode.mRID == node.mRID
111 ):
112 return location
114 return None
116 def _extract_coordinates_from_location(
117 self, location
118 ) -> tuple[float | None, float | None]:
119 """Extract latitude and longitude from CIM Location object."""
120 if not location or not hasattr(location, "PositionPoints"):
121 return None, None
123 position_points = location.PositionPoints
124 if not position_points:
125 return None, None
127 # Use the first position point
128 point = position_points[0]
130 # Extract coordinates - these might be in different attributes
131 lat = None
132 lon = None
134 if hasattr(point, "xPosition") and point.xPosition:
135 try:
136 lon = float(point.xPosition)
137 except (ValueError, TypeError):
138 pass
140 if hasattr(point, "yPosition") and point.yPosition:
141 try:
142 lat = float(point.yPosition)
143 except (ValueError, TypeError):
144 pass
146 return lat, lon
148 def _determine_bus_type(self, node, network) -> str:
149 """Determine bus type (SWING, PV, PQ)."""
150 for source in network.graph.get(cim.EnergySource, {}).values():
151 if hasattr(source, "Terminals") and source.Terminals:
152 for terminal in source.Terminals:
153 if terminal.ConnectivityNode.mRID == node.mRID:
154 return "SWING"
155 return "PQ"
157 def _process_bus_loads(self, node, network, bus_data: dict):
158 """Process all loads connected to this bus."""
159 total_loads = {
160 "a": {"p": 0.0, "q": 0.0},
161 "b": {"p": 0.0, "q": 0.0},
162 "c": {"p": 0.0, "q": 0.0},
163 "s1": {"p": 0.0, "q": 0.0},
164 "s2": {"p": 0.0, "q": 0.0},
165 }
167 for consumer in network.graph.get(cim.EnergyConsumer, {}).values():
168 if hasattr(consumer, "Terminals") and consumer.Terminals:
169 if consumer.Terminals[0].ConnectivityNode.mRID == node.mRID:
170 self._add_consumer_load(consumer, total_loads)
172 for phase in total_loads.keys():
173 bus_data[f"pl_{phase}"] = total_loads[phase]["p"] / self.s_base
174 bus_data[f"ql_{phase}"] = total_loads[phase]["q"] / self.s_base
176 def _add_consumer_load(self, consumer, total_loads: dict):
177 """Add consumer load to total loads."""
178 phase_specific = False
180 if hasattr(consumer, "EnergyConsumerPhase") and consumer.EnergyConsumerPhase:
181 for consumer_phase in consumer.EnergyConsumerPhase:
182 phase_letter = PhaseUtils.get_phase_str(consumer_phase.phase)
183 if phase_letter in ["a", "b", "c", "s1", "s2"]:
184 phase_specific = True
185 p = (
186 float(consumer_phase.p)
187 if hasattr(consumer_phase, "p") and consumer_phase.p
188 else 0.0
189 )
190 q = (
191 float(consumer_phase.q)
192 if hasattr(consumer_phase, "q") and consumer_phase.q
193 else 0.0
194 )
195 total_loads[phase_letter]["p"] += p
196 total_loads[phase_letter]["q"] += q
197 if not phase_specific:
198 total_p = (
199 float(consumer.p) if hasattr(consumer, "p") and consumer.p else 0.0
200 )
201 total_q = (
202 float(consumer.q) if hasattr(consumer, "q") and consumer.q else 0.0
203 )
205 if total_p != 0 or total_q != 0:
206 all_phases = PhaseUtils.get_equipment_phases(consumer)
207 # Filter for standard a,b,c phases before distributing load
208 dist_phases = PhaseUtils.filter_standard_phases(all_phases)
210 if not dist_phases:
211 dist_phases = (
212 "abc" # Default to 3-phase if no standard phases found
213 )
215 num_phases = len(dist_phases)
216 for phase in dist_phases:
217 total_loads[phase]["p"] += total_p / num_phases
218 total_loads[phase]["q"] += total_q / num_phases