Coverage for src/distopf/cim_converter/processors/line_processor.py: 73%

62 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-09 17:44 -0700

1import numpy as np 

2from distopf.cim_converter.processors.base_processor import BaseProcessor 

3from distopf.cim_converter.utils import PhaseUtils 

4import cimgraph.data_profile.cimhub_2023 as cim 

5 

6 

7class LineProcessor(BaseProcessor): 

8 """Processor for ACLineSegment objects.""" 

9 

10 def process(self, network) -> list[dict]: 

11 """Process all ACLineSegment objects.""" 

12 results = [] 

13 for line in network.graph.get(cim.ACLineSegment, {}).values(): 

14 results.append(self._process_line(line)) 

15 return results 

16 

17 def _process_line(self, line) -> dict: 

18 """Process individual line segment.""" 

19 data = self._create_base_branch_dict() 

20 

21 # Basic line info 

22 data["name"] = line.name 

23 data["type"] = "ACLineSegment" 

24 

25 # Get terminals 

26 terminals = line.Terminals 

27 from_bus = terminals[0].ConnectivityNode 

28 to_bus = terminals[1].ConnectivityNode 

29 data["from_name"] = from_bus.name 

30 data["to_name"] = to_bus.name 

31 

32 # Get voltage base 

33 v_ln_base = self._get_bus_voltage_base(from_bus) 

34 z_base = v_ln_base**2 / self.s_base 

35 data["v_ln_base"] = v_ln_base 

36 data["z_base"] = z_base 

37 data["s_base"] = self.s_base 

38 

39 # Process impedance 

40 self._process_line_impedance(line, data, z_base) 

41 

42 # Process phases using utility function 

43 data["phases"] = PhaseUtils.get_equipment_phases(line) 

44 

45 # Length 

46 data["length"] = float(line.length) 

47 

48 return data 

49 

50 def _process_line_impedance(self, line, data: dict, z_base: float): 

51 """Process line impedance matrix.""" 

52 if not hasattr(line.PerLengthImpedance, "PhaseImpedanceData"): 

53 self._process_line_impedance_no_phases(line, data, z_base) 

54 return 

55 # raise AttributeError(f"PhaseImpedanceData not found for line {line.name}: {line.pprint()}") 

56 if line.PerLengthImpedance.PhaseImpedanceData is None: 

57 self._process_line_impedance_no_phases(line, data, z_base) 

58 return 

59 # raise AttributeError(f"PhaseImpedanceData not found for line {line.name}: {line.pprint()}") 

60 

61 length = float(line.length) 

62 possible_phases = np.array(["a", "b", "c"]) 

63 

64 # Initialize impedance matrix 

65 for combo in ["aa", "ab", "ac", "bb", "bc", "cc"]: 

66 data[f"r{combo}"] = 0.0 

67 data[f"x{combo}"] = 0.0 

68 

69 # Process phase impedance data 

70 for impedance_data in line.PerLengthImpedance.PhaseImpedanceData: 

71 row = int(impedance_data.row) - 1 

72 col = int(impedance_data.column) - 1 

73 ph = "".join(possible_phases[sorted([row, col])]) 

74 

75 real = length * float(impedance_data.r) / z_base 

76 imag = length * float(impedance_data.x) / z_base 

77 

78 data[f"r{ph}"] = real 

79 data[f"x{ph}"] = imag 

80 

81 def _process_line_impedance_no_phases(self, line, data: dict, z_base: float): 

82 length = float(line.length) 

83 r = line.r 

84 x = line.x 

85 if not r: 

86 r = 0 # probably using WirePositions instead 

87 # raise AttributeError(f"r not found for line {line.name}") 

88 if not x: 

89 x = 0 

90 # raise AttributeError(f"r not found for line {line.name}") 

91 # Initialize impedance matrix 

92 for combo in ["aa", "ab", "ac", "bb", "bc", "cc"]: 

93 data[f"r{combo}"] = 0.0 

94 data[f"x{combo}"] = 0.0 

95 for combo in ["aa", "bb", "cc"]: 

96 data[f"r{combo}"] = length * float(r) / z_base 

97 data[f"x{combo}"] = length * float(x) / z_base