Coverage for src/distopf/cim_importer/processors/generator_processor.py: 70%

93 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-13 17:34 -0800

1from distopf.cim_importer.processors.base_processor import BaseProcessor 

2import cimgraph.data_profile.cimhub_2023 as cim 

3from distopf.cim_importer.utils import PhaseUtils 

4 

5 

6class GeneratorProcessor(BaseProcessor): 

7 """Processor for generator objects (PowerElectronicsConnection, EnergySource, etc.).""" 

8 

9 def process(self, network) -> list[dict]: 

10 """Process all generator types.""" 

11 results = [] 

12 # Process PowerElectronicsConnection (PV, Battery, etc.) 

13 results.extend(self._process_power_electronics_connections(network)) 

14 # Process EnergySource (may be generators or sources) 

15 results.extend(self._process_energy_sources(network)) 

16 return results 

17 

18 def _process_power_electronics_connections(self, network) -> list[dict]: 

19 """Process PowerElectronicsConnection objects.""" 

20 results = [] 

21 for pec in network.graph.get(cim.PowerElectronicsConnection, {}).values(): 

22 gen_data = self._process_power_electronics_connection(pec) 

23 if gen_data: 

24 results.append(gen_data) 

25 return results 

26 

27 def _process_power_electronics_connection(self, pec) -> dict | None: 

28 """Process individual PowerElectronicsConnection.""" 

29 if not pec.Terminals or len(pec.Terminals) == 0: 

30 return None 

31 

32 s_base_gen = ( 

33 float(pec.ratedS) if hasattr(pec, "ratedS") and pec.ratedS else self.s_base 

34 ) 

35 

36 bus_name = pec.Terminals[0].ConnectivityNode.name 

37 gen_name = pec.name 

38 

39 # Convert power and limits to system per-unit 

40 total_p_pu = float(pec.p) / self.s_base if hasattr(pec, "p") and pec.p else 0.0 

41 total_q_pu = float(pec.q) / self.s_base if hasattr(pec, "q") and pec.q else 0.0 

42 max_q_pu = ( 

43 (float(pec.maxQ) / self.s_base) 

44 if hasattr(pec, "maxQ") and pec.maxQ 

45 else 0.0 

46 ) 

47 min_q_pu = ( 

48 (float(pec.minQ) / self.s_base) 

49 if hasattr(pec, "minQ") and pec.minQ 

50 else 0.0 

51 ) 

52 rated_s_pu = ( 

53 (float(pec.ratedS) / self.s_base) 

54 if hasattr(pec, "ratedS") and pec.ratedS 

55 else 0.0 

56 ) 

57 

58 gen_data = { 

59 "mrid": pec.mRID, 

60 "id": bus_name, # Use bus name (will be mapped to integer later) 

61 "name": gen_name, 

62 "pa": 0.0, 

63 "pb": 0.0, 

64 "pc": 0.0, 

65 "qa": 0.0, 

66 "qb": 0.0, 

67 "qc": 0.0, 

68 "s_base": s_base_gen, 

69 "sa_max": 0.0, 

70 "sb_max": 0.0, 

71 "sc_max": 0.0, 

72 "phases": "", 

73 "qa_max": 0.0, 

74 "qb_max": 0.0, 

75 "qc_max": 0.0, 

76 "qa_min": 0.0, 

77 "qb_min": 0.0, 

78 "qc_min": 0.0, 

79 "p": total_p_pu, 

80 "q": total_q_pu, 

81 "s_max": rated_s_pu, 

82 "s_min": -rated_s_pu, 

83 "q_max": max_q_pu, 

84 "q_min": min_q_pu, 

85 "ps1": 0.0, 

86 "ps2": 0.0, 

87 "qs1": 0.0, 

88 "qs2": 0.0, 

89 "ss1_max": 0.0, 

90 "ss2_max": 0.0, 

91 "qs1_max": 0.0, 

92 "qs2_max": 0.0, 

93 "qs1_min": 0.0, 

94 "qs2_min": 0.0, 

95 "control_variable": "PQ", 

96 } 

97 

98 # Process phase-specific data 

99 phase_data = {} 

100 active_phases_for_limits = set() 

101 if ( 

102 hasattr(pec, "PowerElectronicsConnectionPhases") 

103 and pec.PowerElectronicsConnectionPhases 

104 ): 

105 phase_conn: cim.PowerElectronicsConnectionPhase 

106 for phase_conn in pec.PowerElectronicsConnectionPhases: 

107 phase_letter = PhaseUtils.get_phase_str(phase_conn.phase) 

108 if not phase_letter: 

109 continue 

110 phase_p = ( 

111 float(phase_conn.p) / self.s_base 

112 if hasattr(phase_conn, "p") and phase_conn.p 

113 else 0.0 

114 ) 

115 phase_q = ( 

116 float(phase_conn.q) / self.s_base 

117 if hasattr(phase_conn, "q") and phase_conn.q 

118 else 0.0 

119 ) 

120 phase_data[phase_letter] = {"p": phase_p, "q": phase_q} 

121 

122 active_phases_for_limits.add(phase_letter) 

123 

124 # Distribute power across phases 

125 if phase_data: 

126 abc_phases, s_phases = set(), set() 

127 for phase_letter, data in phase_data.items(): 

128 if phase_letter in ["s1", "s2"]: 

129 s_phases.add(phase_letter) 

130 if phase_letter in "abc": 

131 abc_phases.add(phase_letter) 

132 gen_data[f"p{phase_letter}"], gen_data[f"q{phase_letter}"] = ( 

133 data["p"], 

134 data["q"], 

135 ) 

136 gen_data["phases"] = ( 

137 f"{''.join(sorted(abc_phases))}{''.join(sorted(s_phases))}" 

138 ) 

139 # else: 

140 # gen_data["pa"] = gen_data["pb"] = gen_data["pc"] = total_p_pu / 3.0 

141 # gen_data["qa"] = gen_data["qb"] = gen_data["qc"] = total_q_pu / 3.0 

142 # gen_data["phases"] = "abc" 

143 # if total_p_pu != 0 or total_q_pu != 0: 

144 # active_phases_for_limits = {"a", "b", "c"} 

145 

146 # Distribute limits (in system p.u.) across active phases 

147 num_phases = len(active_phases_for_limits) 

148 if rated_s_pu > 0 and num_phases > 0: 

149 s_max_pu_per_phase = rated_s_pu / num_phases 

150 q_max_pu_per_phase = max_q_pu / num_phases 

151 q_min_pu_per_phase = min_q_pu / num_phases 

152 for phase_char in active_phases_for_limits: 

153 gen_data[f"s{phase_char}_max"] = s_max_pu_per_phase 

154 gen_data[f"q{phase_char}_max"] = q_max_pu_per_phase 

155 gen_data[f"q{phase_char}_min"] = q_min_pu_per_phase 

156 

157 return gen_data 

158 

159 def _process_energy_sources(self, network) -> list[dict]: 

160 results = [] 

161 for source in network.graph.get(cim.EnergySource, {}).values(): 

162 if hasattr(source, "Terminals") and source.Terminals: 

163 if "sourcebus" in source.Terminals[0].ConnectivityNode.name.lower(): 

164 continue 

165 gen_data = self._process_energy_source(source) 

166 if gen_data: 

167 results.append(gen_data) 

168 return results 

169 

170 def _process_energy_source(self, source) -> dict | None: 

171 if not source.Terminals or len(source.Terminals) == 0: 

172 return None 

173 bus_name = source.Terminals[0].ConnectivityNode.name 

174 

175 gen_data = { 

176 "mrid": source.mRID, 

177 "id": bus_name, # Use bus name (will be mapped to integer later) 

178 "name": f"Generator_{source.mRID[:8]}", 

179 "pa": 0.0, 

180 "pb": 0.0, 

181 "pc": 0.0, 

182 "qa": 0.0, 

183 "qb": 0.0, 

184 "qc": 0.0, 

185 "s_base": self.s_base, 

186 "sa_max": 0.0, 

187 "sb_max": 0.0, 

188 "sc_max": 0.0, 

189 "phases": "abc", 

190 "qa_max": 0.0, 

191 "qb_max": 0.0, 

192 "qc_max": 0.0, 

193 "qa_min": 0.0, 

194 "qb_min": 0.0, 

195 "qc_min": 0.0, 

196 "ps1": 0.0, 

197 "ps2": 0.0, 

198 "qs1": 0.0, 

199 "qs2": 0.0, 

200 "ss1_max": 0.0, 

201 "ss2_max": 0.0, 

202 "qs1_max": 0.0, 

203 "qs2_max": 0.0, 

204 "qs1_min": 0.0, 

205 "qs2_min": 0.0, 

206 "control_variable": "PQ", 

207 } 

208 if hasattr(source, "activePower") and source.activePower: 

209 total_p = float(source.activePower) / self.s_base 

210 gen_data["pa"] = gen_data["pb"] = gen_data["pc"] = total_p / 3.0 

211 if hasattr(source, "reactivePower") and source.reactivePower: 

212 total_q = float(source.reactivePower) / self.s_base 

213 gen_data["qa"] = gen_data["qb"] = gen_data["qc"] = total_q / 3.0 

214 return gen_data 

215 

216 def _get_phase_str(self, phase_code) -> str | None: 

217 phase_str = str(phase_code.value).lower() 

218 if "s1" in phase_str: 

219 return "s1" 

220 elif "s2" in phase_str: 

221 return "s2" 

222 elif "a" in phase_str: 

223 return "a" 

224 elif "b" in phase_str: 

225 return "b" 

226 elif "c" in phase_str: 

227 return "c" 

228 return None