Coverage for src/distopf/cim_importer/processors/bus_processor.py: 83%

105 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1from distopf.cim_importer.processors.base_processor import BaseProcessor 

2from distopf.cim_importer.utils import PhaseUtils 

3import cimgraph.data_profile.cimhub_2023 as cim 

4 

5 

6class BusProcessor(BaseProcessor): 

7 """Processor for bus/node data.""" 

8 

9 def process(self, network) -> list[dict]: 

10 """Process all bus data.""" 

11 results = [] 

12 connectivity_nodes = network.graph.get(cim.ConnectivityNode, {}) 

13 

14 for node in connectivity_nodes.values(): 

15 bus_data = self._process_bus(node, network) 

16 if bus_data: 

17 results.append(bus_data) 

18 

19 return results 

20 

21 def _process_bus(self, node, network) -> dict: 

22 """Process individual bus/connectivity node.""" 

23 bus_data = { 

24 "mrid": node.mRID, 

25 "id": None, # To be populated later 

26 "name": node.name, 

27 "pl_a": 0.0, 

28 "ql_a": 0.0, 

29 "pl_b": 0.0, 

30 "ql_b": 0.0, 

31 "pl_c": 0.0, 

32 "ql_c": 0.0, 

33 "pl_s1": 0.0, 

34 "ql_s1": 0.0, 

35 "pl_s2": 0.0, 

36 "ql_s2": 0.0, 

37 "bus_type": "PQ", 

38 "v_a": 1.0, 

39 "v_b": 1.0, 

40 "v_c": 1.0, 

41 "v_ln_base": None, 

42 "s_base": self.s_base, 

43 "v_min": 0.95, 

44 "v_max": 1.05, 

45 "cvr_p": 0, 

46 "cvr_q": 0, 

47 "phases": None, 

48 "latitude": None, 

49 "longitude": None, 

50 } 

51 

52 bus_data["bus_type"] = self._determine_bus_type(node, network) 

53 bus_data["v_ln_base"] = self._get_bus_voltage_base(node) 

54 self._process_bus_loads(node, network, bus_data) 

55 self._process_bus_location(node, network, bus_data) 

56 return bus_data 

57 

58 def _process_bus_location(self, node, network, bus_data: dict): 

59 """Process location data for bus if available.""" 

60 location = self._find_node_location(node, network) 

61 if location: 

62 lat, lon = self._extract_coordinates_from_location(location) 

63 bus_data["latitude"] = lat 

64 bus_data["longitude"] = lon 

65 else: 

66 bus_data["latitude"] = None 

67 bus_data["longitude"] = None 

68 

69 def _find_node_location(self, node, network): 

70 """Find location associated with this connectivity node through connected equipment.""" 

71 # Check equipment types that might be connected to this node 

72 equipment_types = [ 

73 cim.EnergyConsumer, 

74 cim.PowerElectronicsConnection, 

75 cim.LinearShuntCompensator, 

76 cim.ACLineSegment, 

77 cim.PowerTransformer, 

78 cim.Switch, 

79 cim.EnergySource, 

80 ] 

81 

82 for equipment_type in equipment_types: 

83 if equipment_type in network.graph: 

84 for equipment in network.graph[equipment_type].values(): 

85 if hasattr(equipment, "Terminals") and equipment.Terminals: 

86 for terminal in equipment.Terminals: 

87 if ( 

88 hasattr(terminal, "ConnectivityNode") 

89 and terminal.ConnectivityNode.mRID == node.mRID 

90 ): 

91 # Found equipment connected to this node 

92 if ( 

93 hasattr(equipment, "Location") 

94 and equipment.Location 

95 ): 

96 return equipment.Location 

97 

98 # Also check if any Location directly references this node 

99 if cim.Location in network.graph: 

100 for location in network.graph[cim.Location].values(): 

101 if ( 

102 hasattr(location, "PowerSystemResources") 

103 and location.PowerSystemResources 

104 ): 

105 for psr in location.PowerSystemResources: 

106 if hasattr(psr, "Terminals") and psr.Terminals: 

107 for terminal in psr.Terminals: 

108 if ( 

109 hasattr(terminal, "ConnectivityNode") 

110 and terminal.ConnectivityNode.mRID == node.mRID 

111 ): 

112 return location 

113 

114 return None 

115 

116 def _extract_coordinates_from_location( 

117 self, location 

118 ) -> tuple[float | None, float | None]: 

119 """Extract latitude and longitude from CIM Location object.""" 

120 if not location or not hasattr(location, "PositionPoints"): 

121 return None, None 

122 

123 position_points = location.PositionPoints 

124 if not position_points: 

125 return None, None 

126 

127 # Use the first position point 

128 point = position_points[0] 

129 

130 # Extract coordinates - these might be in different attributes 

131 lat = None 

132 lon = None 

133 

134 if hasattr(point, "xPosition") and point.xPosition: 

135 try: 

136 lon = float(point.xPosition) 

137 except (ValueError, TypeError): 

138 pass 

139 

140 if hasattr(point, "yPosition") and point.yPosition: 

141 try: 

142 lat = float(point.yPosition) 

143 except (ValueError, TypeError): 

144 pass 

145 

146 return lat, lon 

147 

148 def _determine_bus_type(self, node, network) -> str: 

149 """Determine bus type (SWING, PV, PQ).""" 

150 for source in network.graph.get(cim.EnergySource, {}).values(): 

151 if hasattr(source, "Terminals") and source.Terminals: 

152 for terminal in source.Terminals: 

153 if terminal.ConnectivityNode.mRID == node.mRID: 

154 return "SWING" 

155 return "PQ" 

156 

157 def _process_bus_loads(self, node, network, bus_data: dict): 

158 """Process all loads connected to this bus.""" 

159 total_loads = { 

160 "a": {"p": 0.0, "q": 0.0}, 

161 "b": {"p": 0.0, "q": 0.0}, 

162 "c": {"p": 0.0, "q": 0.0}, 

163 "s1": {"p": 0.0, "q": 0.0}, 

164 "s2": {"p": 0.0, "q": 0.0}, 

165 } 

166 

167 for consumer in network.graph.get(cim.EnergyConsumer, {}).values(): 

168 if hasattr(consumer, "Terminals") and consumer.Terminals: 

169 if consumer.Terminals[0].ConnectivityNode.mRID == node.mRID: 

170 self._add_consumer_load(consumer, total_loads) 

171 

172 for phase in total_loads.keys(): 

173 bus_data[f"pl_{phase}"] = total_loads[phase]["p"] / self.s_base 

174 bus_data[f"ql_{phase}"] = total_loads[phase]["q"] / self.s_base 

175 

176 def _add_consumer_load(self, consumer, total_loads: dict): 

177 """Add consumer load to total loads.""" 

178 phase_specific = False 

179 

180 if hasattr(consumer, "EnergyConsumerPhase") and consumer.EnergyConsumerPhase: 

181 for consumer_phase in consumer.EnergyConsumerPhase: 

182 phase_letter = PhaseUtils.get_phase_str(consumer_phase.phase) 

183 if phase_letter in ["a", "b", "c", "s1", "s2"]: 

184 phase_specific = True 

185 p = ( 

186 float(consumer_phase.p) 

187 if hasattr(consumer_phase, "p") and consumer_phase.p 

188 else 0.0 

189 ) 

190 q = ( 

191 float(consumer_phase.q) 

192 if hasattr(consumer_phase, "q") and consumer_phase.q 

193 else 0.0 

194 ) 

195 total_loads[phase_letter]["p"] += p 

196 total_loads[phase_letter]["q"] += q 

197 if not phase_specific: 

198 total_p = ( 

199 float(consumer.p) if hasattr(consumer, "p") and consumer.p else 0.0 

200 ) 

201 total_q = ( 

202 float(consumer.q) if hasattr(consumer, "q") and consumer.q else 0.0 

203 ) 

204 

205 if total_p != 0 or total_q != 0: 

206 all_phases = PhaseUtils.get_equipment_phases(consumer) 

207 # Filter for standard a,b,c phases before distributing load 

208 dist_phases = PhaseUtils.filter_standard_phases(all_phases) 

209 

210 if not dist_phases: 

211 dist_phases = ( 

212 "abc" # Default to 3-phase if no standard phases found 

213 ) 

214 

215 num_phases = len(dist_phases) 

216 for phase in dist_phases: 

217 total_loads[phase]["p"] += total_p / num_phases 

218 total_loads[phase]["q"] += total_q / num_phases