Coverage for src/dcm/parser.py: 54%

167 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-05 18:17 +0200

1import re 

2from typing import Any, Callable, Optional, TypeVar, Union 

3 

4from pydantic_yaml import parse_yaml_file_as 

5 

6from . import spec 

7from .models import ( 

8 Config, 

9 Device, 

10 EnvFileInfo, 

11 Extends, 

12 Network, 

13 Port, 

14 RootVolume, 

15 Secret, 

16 Service, 

17 Volume, 

18 VolumeType, 

19 parse_list_of_dict_of_tuples, 

20) 

21 

22 

23class Compose: 

24 def __init__(self) -> None: 

25 self.include: list[spec.Include] = [] 

26 self.services: dict[str, Service] = {} 

27 self.configs: dict[str, Config] = {} 

28 self.networks: dict[str, Network] = {} 

29 self.secrets: dict[str, Secret] = {} 

30 self.volumes: dict[str, RootVolume] = {} 

31 

32 

33def parse_port(source_file: str, port_data: Union[float, str, spec.Ports]) -> Port: 

34 if isinstance(port_data, float): 

35 return Port(source_file, "0.0.0.0", str(int(port_data)), str(int(port_data))) 

36 if isinstance(port_data, str): 

37 regex = r"((?P<host_ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:|(\$\{([^}]+)\}):)|:|)?((?P<host_port>\d+(\-\d+)?):)?((?P<container_port>\d+(\-\d+)?))?(/(?P<protocol>\w+))?" # noqa: E501 

38 match = re.match(regex, port_data) 

39 

40 if match: 

41 host_ip = match.group("host_ip") 

42 host_port = match.group("host_port") 

43 container_port = match.group("container_port") 

44 # protocol = match.group("protocol") 

45 

46 assert container_port, "Invalid port format, aborting." 

47 

48 if container_port is not None and host_port is None: 

49 host_port = container_port 

50 

51 if host_ip is not None: 

52 return Port( 

53 source_file=source_file, 

54 host=host_ip, 

55 source_port=host_port, 

56 container_port=container_port, 

57 ) 

58 return Port( 

59 source_file=source_file, 

60 host="0.0.0.0", 

61 source_port=host_port, 

62 container_port=container_port, 

63 ) 

64 elif isinstance(port_data, spec.Ports): 

65 assert port_data.target is not None, "Invalid port format, aborting." 

66 

67 if isinstance(port_data.published, (str, int)): 

68 host_port = str(port_data.published) 

69 

70 if isinstance(port_data.target, int): 

71 container_port = str(port_data.target) 

72 

73 host_ip = port_data.host_ip 

74 

75 if container_port is not None and host_port is None: 

76 host_port = container_port 

77 

78 if host_ip is not None: 

79 return Port( 

80 source_file=source_file, 

81 host=host_ip, 

82 source_port=host_port, 

83 container_port=container_port, 

84 ) 

85 return Port( 

86 source_file=source_file, 

87 host="0.0.0.0", 

88 source_port=host_port, 

89 container_port=container_port, 

90 ) 

91 raise RuntimeError("LogicError while parsing port") 

92 

93 

94def _unwrap_depends_on( 

95 data_depends_on: Union[spec.ListOfStrings, dict[Any, spec.DependsOn], None], 

96) -> dict[str, spec.DependsOn]: 

97 if isinstance(data_depends_on, spec.ListOfStrings): 

98 return { 

99 k: spec.DependsOn( 

100 restart=True, required=True, condition=spec.Condition.service_started 

101 ) 

102 for k in data_depends_on.root 

103 } 

104 if isinstance(data_depends_on, dict): 

105 return data_depends_on 

106 return {} 

107 

108 

109TypeAspec = TypeVar("TypeAspec") 

110TypeAPrime = TypeVar("TypeAPrime") 

111 

112 

113class Parser: 

114 def __init__(self) -> None: 

115 pass 

116 

117 def _parse_service( 

118 self, source_file: str, service_name: str, service_data: spec.Service 

119 ) -> Service: 

120 # pylint: disable=too-many-locals,too-many-branches,too-many-statements 

121 service_image: Optional[str] = None 

122 if service_data.build is not None: 

123 if isinstance(service_data.build, str): 

124 service_image = f"build from '{service_data.build}'" 

125 elif isinstance(service_data.build, spec.Build): 

126 if ( 

127 service_data.build.context is not None 

128 and service_data.build.dockerfile is not None 

129 ): 

130 service_image = f"build from '{service_data.build.context}' using '{service_data.build.dockerfile}'" 

131 elif service_data.build.context is not None: 

132 service_image = f"build from '{service_data.build.context}'" 

133 if service_data.image is not None: 

134 if service_image is not None: 

135 service_image += ", image: " + service_data.image 

136 else: 

137 service_image = service_data.image 

138 

139 service_networks: list[str] = [] 

140 if service_data.networks is not None: 

141 if isinstance(service_data.networks, spec.ListOfStrings): 

142 service_networks = service_data.networks.root 

143 elif isinstance(service_data.networks, dict): 

144 service_networks = list(service_data.networks.keys()) 

145 

146 service_extends: Optional[Extends] = None 

147 if service_data.extends is not None: 

148 # https://github.com/compose-spec/compose-spec/blob/master/spec.md#extends 

149 # The value of the extends key MUST be a dictionary. 

150 assert isinstance(service_data.extends, spec.Extends) 

151 service_extends = Extends( 

152 service_name=service_data.extends.service, 

153 from_file=service_data.extends.file, 

154 ) 

155 

156 service_ports: list[Port] = [] 

157 if service_data.ports is not None: 

158 for port_data in service_data.ports: 

159 service_ports.append( 

160 parse_port(source_file=source_file, port_data=port_data) 

161 ) 

162 

163 service_depends_on = _unwrap_depends_on(service_data.depends_on) 

164 

165 service_volumes: list[Volume] = [] 

166 if service_data.volumes is not None: 

167 for volume_data in service_data.volumes: 

168 if isinstance(volume_data, str): 

169 assert ":" in volume_data, "Invalid volume input, aborting." 

170 

171 split_data = volume_data.split(":") 

172 source = split_data[0] 

173 if len(split_data) == 2: 

174 service_volumes.append( 

175 Volume( 

176 source_file=source_file, 

177 source=source, 

178 target=split_data[1], 

179 ) 

180 ) 

181 elif len(split_data) == 3: 

182 service_volumes.append( 

183 Volume( 

184 source_file=source_file, 

185 source=source, 

186 target=split_data[1], 

187 access_mode=split_data[2], 

188 ) 

189 ) 

190 elif isinstance(volume_data, spec.Volumes): 

191 assert volume_data.target is not None, ( 

192 "Invalid volume input, aborting." 

193 ) 

194 

195 # https://github.com/compose-spec/compose-spec/blob/master/spec.md#long-syntax-4 

196 # `volume_data.source` is not applicable for a tmpfs mount. 

197 if volume_data.source is None: 

198 volume_data.source = volume_data.target 

199 

200 assert volume_data.source is not None 

201 source = volume_data.source 

202 

203 service_volumes.append( 

204 Volume( 

205 source_file=source_file, 

206 source=source, 

207 target=volume_data.target, 

208 v_type=VolumeType[volume_data.type], 

209 ) 

210 ) 

211 

212 service_links: list[str] = [] 

213 if service_data.links is not None: 

214 service_links = service_data.links 

215 

216 cgroup_parent: Optional[str] = None 

217 if service_data.cgroup_parent is not None: 

218 cgroup_parent = service_data.cgroup_parent 

219 

220 container_name: Optional[str] = None 

221 if service_data.container_name is not None: 

222 container_name = service_data.container_name 

223 

224 env_file: dict[str, EnvFileInfo] = {} 

225 if service_data.env_file is not None: 

226 if isinstance(service_data.env_file.root, str): 

227 env_file[service_data.env_file.root] = EnvFileInfo( 

228 service_data.env_file.root 

229 ) 

230 elif isinstance(service_data.env_file.root, list): 

231 for env_file_data in service_data.env_file.root: 

232 if isinstance(env_file_data, str): 

233 env_file[env_file_data] = EnvFileInfo(env_file_data) 

234 elif isinstance(env_file_data, spec.EnvFile1): 

235 env_file[env_file_data.path] = EnvFileInfo( 

236 path=env_file_data.path, required=env_file_data.required 

237 ) 

238 else: 

239 print(f"Invalid env_file data: {service_data.env_file.root}") 

240 

241 expose: list[str] = [] 

242 if service_data.expose is not None: 

243 for port in service_data.expose: 

244 # to avoid to have values like 8885.0 for instance, cast floats into int first 

245 port_str = str(int(port)) if isinstance(port, float) else str(port) 

246 expose.append(port_str) 

247 

248 profiles: list[str] = [] 

249 if service_data.profiles is not None: 

250 if isinstance(service_data.profiles, spec.ListOfStrings): 

251 profiles = service_data.profiles.root 

252 

253 devices: list[Device] = [] 

254 if service_data.devices is not None: 

255 for device_data in service_data.devices: 

256 if isinstance(device_data, str): 

257 assert ":" in device_data, "Invalid volume input, aborting." 

258 

259 split_data = device_data.split(":") 

260 if len(split_data) == 2: 

261 devices.append( 

262 Device( 

263 source_file=source_file, 

264 host_path=split_data[0], 

265 container_path=split_data[1], 

266 ) 

267 ) 

268 elif len(split_data) == 3: 

269 devices.append( 

270 Device( 

271 source_file=source_file, 

272 host_path=split_data[0], 

273 container_path=split_data[1], 

274 cgroup_permissions=split_data[2], 

275 ) 

276 ) 

277 

278 return Service( 

279 source_file=source_file, 

280 name=service_name, 

281 annotations=parse_list_of_dict_of_tuples(service_data.annotations), 

282 labels=parse_list_of_dict_of_tuples(service_data.labels), 

283 image=service_image, 

284 networks=service_networks, 

285 extends=service_extends, 

286 ports=service_ports, 

287 depends_on=service_depends_on, 

288 volumes=service_volumes, 

289 links=service_links, 

290 cgroup_parent=cgroup_parent, 

291 container_name=container_name, 

292 env_file=env_file, 

293 expose=expose, 

294 profiles=profiles, 

295 devices=devices, 

296 ) 

297 

298 def merge( 

299 self, main_compose: Compose, file: str, compose: spec.ComposeSpecification 

300 ) -> None: 

301 if compose.services: 

302 for service_name, service_data in compose.services.items(): 

303 service = self._parse_service( 

304 source_file=file, 

305 service_name=service_name, 

306 service_data=service_data, 

307 ) 

308 if service_name in main_compose.services: 

309 main_compose.services[service_name].merge(service) 

310 else: 

311 main_compose.services[service_name] = service 

312 

313 def merge( 

314 into: dict[str, TypeAPrime], 

315 val: Optional[dict[str, Any]], 

316 constructor: Callable[[str, TypeAspec], TypeAPrime], 

317 ) -> None: 

318 assert into is not None 

319 if val is None: 

320 return 

321 for k, v in val.items(): 

322 if k not in into: 

323 into[k] = constructor(file, v) 

324 

325 if compose.include: 

326 main_compose.include.extend(compose.include) 

327 merge(main_compose.configs, compose.configs, Config) 

328 merge(main_compose.networks, compose.networks, Network) 

329 merge(main_compose.secrets, compose.secrets, Secret) 

330 merge(main_compose.volumes, compose.volumes, RootVolume) 

331 

332 

333def parse_compose_files(*file_list: str) -> Compose: 

334 compose_result = Compose() 

335 parser = Parser() 

336 for f in file_list: 

337 res = parse_yaml_file_as(spec.ComposeSpecification, f) 

338 parser.merge(main_compose=compose_result, file=f, compose=res) 

339 return compose_result