Coverage for src/dcm/parser.py: 54%
167 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 18:17 +0200
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-05 18:17 +0200
1import re
2from typing import Any, Callable, Optional, TypeVar, Union
4from pydantic_yaml import parse_yaml_file_as
6from . import spec
7from .models import (
8 Config,
9 Device,
10 EnvFileInfo,
11 Extends,
12 Network,
13 Port,
14 RootVolume,
15 Secret,
16 Service,
17 Volume,
18 VolumeType,
19 parse_list_of_dict_of_tuples,
20)
23class Compose:
24 def __init__(self) -> None:
25 self.include: list[spec.Include] = []
26 self.services: dict[str, Service] = {}
27 self.configs: dict[str, Config] = {}
28 self.networks: dict[str, Network] = {}
29 self.secrets: dict[str, Secret] = {}
30 self.volumes: dict[str, RootVolume] = {}
33def parse_port(source_file: str, port_data: Union[float, str, spec.Ports]) -> Port:
34 if isinstance(port_data, float):
35 return Port(source_file, "0.0.0.0", str(int(port_data)), str(int(port_data)))
36 if isinstance(port_data, str):
37 regex = r"((?P<host_ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:|(\$\{([^}]+)\}):)|:|)?((?P<host_port>\d+(\-\d+)?):)?((?P<container_port>\d+(\-\d+)?))?(/(?P<protocol>\w+))?" # noqa: E501
38 match = re.match(regex, port_data)
40 if match:
41 host_ip = match.group("host_ip")
42 host_port = match.group("host_port")
43 container_port = match.group("container_port")
44 # protocol = match.group("protocol")
46 assert container_port, "Invalid port format, aborting."
48 if container_port is not None and host_port is None:
49 host_port = container_port
51 if host_ip is not None:
52 return Port(
53 source_file=source_file,
54 host=host_ip,
55 source_port=host_port,
56 container_port=container_port,
57 )
58 return Port(
59 source_file=source_file,
60 host="0.0.0.0",
61 source_port=host_port,
62 container_port=container_port,
63 )
64 elif isinstance(port_data, spec.Ports):
65 assert port_data.target is not None, "Invalid port format, aborting."
67 if isinstance(port_data.published, (str, int)):
68 host_port = str(port_data.published)
70 if isinstance(port_data.target, int):
71 container_port = str(port_data.target)
73 host_ip = port_data.host_ip
75 if container_port is not None and host_port is None:
76 host_port = container_port
78 if host_ip is not None:
79 return Port(
80 source_file=source_file,
81 host=host_ip,
82 source_port=host_port,
83 container_port=container_port,
84 )
85 return Port(
86 source_file=source_file,
87 host="0.0.0.0",
88 source_port=host_port,
89 container_port=container_port,
90 )
91 raise RuntimeError("LogicError while parsing port")
94def _unwrap_depends_on(
95 data_depends_on: Union[spec.ListOfStrings, dict[Any, spec.DependsOn], None],
96) -> dict[str, spec.DependsOn]:
97 if isinstance(data_depends_on, spec.ListOfStrings):
98 return {
99 k: spec.DependsOn(
100 restart=True, required=True, condition=spec.Condition.service_started
101 )
102 for k in data_depends_on.root
103 }
104 if isinstance(data_depends_on, dict):
105 return data_depends_on
106 return {}
109TypeAspec = TypeVar("TypeAspec")
110TypeAPrime = TypeVar("TypeAPrime")
113class Parser:
114 def __init__(self) -> None:
115 pass
117 def _parse_service(
118 self, source_file: str, service_name: str, service_data: spec.Service
119 ) -> Service:
120 # pylint: disable=too-many-locals,too-many-branches,too-many-statements
121 service_image: Optional[str] = None
122 if service_data.build is not None:
123 if isinstance(service_data.build, str):
124 service_image = f"build from '{service_data.build}'"
125 elif isinstance(service_data.build, spec.Build):
126 if (
127 service_data.build.context is not None
128 and service_data.build.dockerfile is not None
129 ):
130 service_image = f"build from '{service_data.build.context}' using '{service_data.build.dockerfile}'"
131 elif service_data.build.context is not None:
132 service_image = f"build from '{service_data.build.context}'"
133 if service_data.image is not None:
134 if service_image is not None:
135 service_image += ", image: " + service_data.image
136 else:
137 service_image = service_data.image
139 service_networks: list[str] = []
140 if service_data.networks is not None:
141 if isinstance(service_data.networks, spec.ListOfStrings):
142 service_networks = service_data.networks.root
143 elif isinstance(service_data.networks, dict):
144 service_networks = list(service_data.networks.keys())
146 service_extends: Optional[Extends] = None
147 if service_data.extends is not None:
148 # https://github.com/compose-spec/compose-spec/blob/master/spec.md#extends
149 # The value of the extends key MUST be a dictionary.
150 assert isinstance(service_data.extends, spec.Extends)
151 service_extends = Extends(
152 service_name=service_data.extends.service,
153 from_file=service_data.extends.file,
154 )
156 service_ports: list[Port] = []
157 if service_data.ports is not None:
158 for port_data in service_data.ports:
159 service_ports.append(
160 parse_port(source_file=source_file, port_data=port_data)
161 )
163 service_depends_on = _unwrap_depends_on(service_data.depends_on)
165 service_volumes: list[Volume] = []
166 if service_data.volumes is not None:
167 for volume_data in service_data.volumes:
168 if isinstance(volume_data, str):
169 assert ":" in volume_data, "Invalid volume input, aborting."
171 split_data = volume_data.split(":")
172 source = split_data[0]
173 if len(split_data) == 2:
174 service_volumes.append(
175 Volume(
176 source_file=source_file,
177 source=source,
178 target=split_data[1],
179 )
180 )
181 elif len(split_data) == 3:
182 service_volumes.append(
183 Volume(
184 source_file=source_file,
185 source=source,
186 target=split_data[1],
187 access_mode=split_data[2],
188 )
189 )
190 elif isinstance(volume_data, spec.Volumes):
191 assert volume_data.target is not None, (
192 "Invalid volume input, aborting."
193 )
195 # https://github.com/compose-spec/compose-spec/blob/master/spec.md#long-syntax-4
196 # `volume_data.source` is not applicable for a tmpfs mount.
197 if volume_data.source is None:
198 volume_data.source = volume_data.target
200 assert volume_data.source is not None
201 source = volume_data.source
203 service_volumes.append(
204 Volume(
205 source_file=source_file,
206 source=source,
207 target=volume_data.target,
208 v_type=VolumeType[volume_data.type],
209 )
210 )
212 service_links: list[str] = []
213 if service_data.links is not None:
214 service_links = service_data.links
216 cgroup_parent: Optional[str] = None
217 if service_data.cgroup_parent is not None:
218 cgroup_parent = service_data.cgroup_parent
220 container_name: Optional[str] = None
221 if service_data.container_name is not None:
222 container_name = service_data.container_name
224 env_file: dict[str, EnvFileInfo] = {}
225 if service_data.env_file is not None:
226 if isinstance(service_data.env_file.root, str):
227 env_file[service_data.env_file.root] = EnvFileInfo(
228 service_data.env_file.root
229 )
230 elif isinstance(service_data.env_file.root, list):
231 for env_file_data in service_data.env_file.root:
232 if isinstance(env_file_data, str):
233 env_file[env_file_data] = EnvFileInfo(env_file_data)
234 elif isinstance(env_file_data, spec.EnvFile1):
235 env_file[env_file_data.path] = EnvFileInfo(
236 path=env_file_data.path, required=env_file_data.required
237 )
238 else:
239 print(f"Invalid env_file data: {service_data.env_file.root}")
241 expose: list[str] = []
242 if service_data.expose is not None:
243 for port in service_data.expose:
244 # to avoid to have values like 8885.0 for instance, cast floats into int first
245 port_str = str(int(port)) if isinstance(port, float) else str(port)
246 expose.append(port_str)
248 profiles: list[str] = []
249 if service_data.profiles is not None:
250 if isinstance(service_data.profiles, spec.ListOfStrings):
251 profiles = service_data.profiles.root
253 devices: list[Device] = []
254 if service_data.devices is not None:
255 for device_data in service_data.devices:
256 if isinstance(device_data, str):
257 assert ":" in device_data, "Invalid volume input, aborting."
259 split_data = device_data.split(":")
260 if len(split_data) == 2:
261 devices.append(
262 Device(
263 source_file=source_file,
264 host_path=split_data[0],
265 container_path=split_data[1],
266 )
267 )
268 elif len(split_data) == 3:
269 devices.append(
270 Device(
271 source_file=source_file,
272 host_path=split_data[0],
273 container_path=split_data[1],
274 cgroup_permissions=split_data[2],
275 )
276 )
278 return Service(
279 source_file=source_file,
280 name=service_name,
281 annotations=parse_list_of_dict_of_tuples(service_data.annotations),
282 labels=parse_list_of_dict_of_tuples(service_data.labels),
283 image=service_image,
284 networks=service_networks,
285 extends=service_extends,
286 ports=service_ports,
287 depends_on=service_depends_on,
288 volumes=service_volumes,
289 links=service_links,
290 cgroup_parent=cgroup_parent,
291 container_name=container_name,
292 env_file=env_file,
293 expose=expose,
294 profiles=profiles,
295 devices=devices,
296 )
298 def merge(
299 self, main_compose: Compose, file: str, compose: spec.ComposeSpecification
300 ) -> None:
301 if compose.services:
302 for service_name, service_data in compose.services.items():
303 service = self._parse_service(
304 source_file=file,
305 service_name=service_name,
306 service_data=service_data,
307 )
308 if service_name in main_compose.services:
309 main_compose.services[service_name].merge(service)
310 else:
311 main_compose.services[service_name] = service
313 def merge(
314 into: dict[str, TypeAPrime],
315 val: Optional[dict[str, Any]],
316 constructor: Callable[[str, TypeAspec], TypeAPrime],
317 ) -> None:
318 assert into is not None
319 if val is None:
320 return
321 for k, v in val.items():
322 if k not in into:
323 into[k] = constructor(file, v)
325 if compose.include:
326 main_compose.include.extend(compose.include)
327 merge(main_compose.configs, compose.configs, Config)
328 merge(main_compose.networks, compose.networks, Network)
329 merge(main_compose.secrets, compose.secrets, Secret)
330 merge(main_compose.volumes, compose.volumes, RootVolume)
333def parse_compose_files(*file_list: str) -> Compose:
334 compose_result = Compose()
335 parser = Parser()
336 for f in file_list:
337 res = parse_yaml_file_as(spec.ComposeSpecification, f)
338 parser.merge(main_compose=compose_result, file=f, compose=res)
339 return compose_result