emailsec._dns_resolver

 1import asyncio
 2import typing
 3
 4from emailsec import _errors as errors
 5
 6import aiodns
 7import pycares
 8
 9_MAX_LOOKUPS = 10
10
11
12class DNSResolver:
13    def __init__(self, no_max_lookups: bool = False) -> None:
14        self.__lookups_counter = 0
15        self.__no_max_lookups = no_max_lookups
16        self.__resolver = aiodns.DNSResolver(loop=asyncio.get_running_loop())
17
18    async def _query(self, name: str, query_type: str) -> typing.Any:
19        self.__lookups_counter += 1
20        if not self.__no_max_lookups and self.__lookups_counter > _MAX_LOOKUPS:
21            raise errors.Permerror("Max DNS lookups exceeded")
22
23        try:
24            return await self.__resolver.query(name, query_type)
25        except aiodns.error.DNSError as dns_error:
26            error_msg = dns_error.args[1]
27            match dns_error.args[0]:
28                case aiodns.error.ARES_ENOTFOUND | aiodns.error.ARES_ENODATA:  # type: ignore
29                    return None
30                case (
31                    aiodns.error.ARES_EBADQUERY  # type: ignore
32                    | aiodns.error.ARES_EFORMERR  # type: ignore
33                    | aiodns.error.ARES_EBADNAME  # type: ignore
34                    | aiodns.error.ARES_EBADRESP  # type: ignore
35                ):
36                    raise errors.Permerror(
37                        f"DNS error (code {dns_error.args[0]}): {error_msg}"
38                    )
39                case _:
40                    raise errors.Temperror(
41                        f"DNS error (code {dns_error.args[0]}): {error_msg}"
42                    )
43
44    async def txt(self, name: str) -> list[pycares.ares_query_txt_result] | None:
45        return await self._query(name, "TXT")
46
47    async def mx(self, name: str) -> list[pycares.ares_query_mx_result] | None:
48        return await self._query(name, "MX")
49
50    async def a(self, name: str) -> list[pycares.ares_query_a_result] | None:
51        return await self._query(name, "A")
52
53    async def aaaa(self, name: str) -> list[pycares.ares_query_aaaa_result] | None:
54        return await self._query(name, "AAAA")
55
56    async def ptr(self, name: str) -> pycares.ares_query_ptr_result | None:
57        return await self._query(name, "PTR")
class DNSResolver:
13class DNSResolver:
14    def __init__(self, no_max_lookups: bool = False) -> None:
15        self.__lookups_counter = 0
16        self.__no_max_lookups = no_max_lookups
17        self.__resolver = aiodns.DNSResolver(loop=asyncio.get_running_loop())
18
19    async def _query(self, name: str, query_type: str) -> typing.Any:
20        self.__lookups_counter += 1
21        if not self.__no_max_lookups and self.__lookups_counter > _MAX_LOOKUPS:
22            raise errors.Permerror("Max DNS lookups exceeded")
23
24        try:
25            return await self.__resolver.query(name, query_type)
26        except aiodns.error.DNSError as dns_error:
27            error_msg = dns_error.args[1]
28            match dns_error.args[0]:
29                case aiodns.error.ARES_ENOTFOUND | aiodns.error.ARES_ENODATA:  # type: ignore
30                    return None
31                case (
32                    aiodns.error.ARES_EBADQUERY  # type: ignore
33                    | aiodns.error.ARES_EFORMERR  # type: ignore
34                    | aiodns.error.ARES_EBADNAME  # type: ignore
35                    | aiodns.error.ARES_EBADRESP  # type: ignore
36                ):
37                    raise errors.Permerror(
38                        f"DNS error (code {dns_error.args[0]}): {error_msg}"
39                    )
40                case _:
41                    raise errors.Temperror(
42                        f"DNS error (code {dns_error.args[0]}): {error_msg}"
43                    )
44
45    async def txt(self, name: str) -> list[pycares.ares_query_txt_result] | None:
46        return await self._query(name, "TXT")
47
48    async def mx(self, name: str) -> list[pycares.ares_query_mx_result] | None:
49        return await self._query(name, "MX")
50
51    async def a(self, name: str) -> list[pycares.ares_query_a_result] | None:
52        return await self._query(name, "A")
53
54    async def aaaa(self, name: str) -> list[pycares.ares_query_aaaa_result] | None:
55        return await self._query(name, "AAAA")
56
57    async def ptr(self, name: str) -> pycares.ares_query_ptr_result | None:
58        return await self._query(name, "PTR")
DNSResolver(no_max_lookups: bool = False)
14    def __init__(self, no_max_lookups: bool = False) -> None:
15        self.__lookups_counter = 0
16        self.__no_max_lookups = no_max_lookups
17        self.__resolver = aiodns.DNSResolver(loop=asyncio.get_running_loop())
async def txt(self, name: str) -> list[pycares.ares_query_txt_result] | None:
45    async def txt(self, name: str) -> list[pycares.ares_query_txt_result] | None:
46        return await self._query(name, "TXT")
async def mx(self, name: str) -> list[pycares.ares_query_mx_result] | None:
48    async def mx(self, name: str) -> list[pycares.ares_query_mx_result] | None:
49        return await self._query(name, "MX")
async def a(self, name: str) -> list[pycares.ares_query_a_result] | None:
51    async def a(self, name: str) -> list[pycares.ares_query_a_result] | None:
52        return await self._query(name, "A")
async def aaaa(self, name: str) -> list[pycares.ares_query_aaaa_result] | None:
54    async def aaaa(self, name: str) -> list[pycares.ares_query_aaaa_result] | None:
55        return await self._query(name, "AAAA")
async def ptr(self, name: str) -> pycares.ares_query_ptr_result | None:
57    async def ptr(self, name: str) -> pycares.ares_query_ptr_result | None:
58        return await self._query(name, "PTR")