emailsec._dns_resolver

 1import asyncio
 2import typing
 3
 4from emailsec import _errors as errors
 5
 6import aiodns
 7import pycares
 8
 9_MAX_LOOKUPS = 10
10
11QueryType = typing.Literal[
12    "A", "AAAA", "CAA", "CNAME", "MX", "NAPTR", "NS", "PTR", "SOA", "SRV", "TXT"
13]
14
15
16class DNSResolver:
17    def __init__(self, no_max_lookups: bool = False) -> None:
18        self.__lookups_counter = 0
19        self.__no_max_lookups = no_max_lookups
20        self.__resolver = aiodns.DNSResolver(loop=asyncio.get_running_loop())
21
22    async def _query(self, name: str, query_type: QueryType) -> typing.Any:
23        self.__lookups_counter += 1
24        if not self.__no_max_lookups and self.__lookups_counter > _MAX_LOOKUPS:
25            raise errors.Permerror("Max DNS lookups exceeded")
26
27        try:
28            return await self.__resolver.query(name, query_type)
29        except aiodns.error.DNSError as dns_error:
30            error_msg = dns_error.args[1]
31            match dns_error.args[0]:
32                case aiodns.error.ARES_ENOTFOUND | aiodns.error.ARES_ENODATA:  # type: ignore
33                    return None
34                case (
35                    aiodns.error.ARES_EBADQUERY  # type: ignore
36                    | aiodns.error.ARES_EFORMERR  # type: ignore
37                    | aiodns.error.ARES_EBADNAME  # type: ignore
38                    | aiodns.error.ARES_EBADRESP  # type: ignore
39                ):
40                    raise errors.Permerror(
41                        f"DNS error (code {dns_error.args[0]}): {error_msg}"
42                    )
43                case _:
44                    raise errors.Temperror(
45                        f"DNS error (code {dns_error.args[0]}): {error_msg}"
46                    )
47
48    async def txt(self, name: str) -> list[pycares.ares_query_txt_result] | None:
49        return await self._query(name, "TXT")
50
51    async def mx(self, name: str) -> list[pycares.ares_query_mx_result] | None:
52        return await self._query(name, "MX")
53
54    async def a(self, name: str) -> list[pycares.ares_query_a_result] | None:
55        return await self._query(name, "A")
56
57    async def aaaa(self, name: str) -> list[pycares.ares_query_aaaa_result] | None:
58        return await self._query(name, "AAAA")
59
60    async def ptr(self, name: str) -> pycares.ares_query_ptr_result | None:
61        return await self._query(name, "PTR")
QueryType = typing.Literal['A', 'AAAA', 'CAA', 'CNAME', 'MX', 'NAPTR', 'NS', 'PTR', 'SOA', 'SRV', 'TXT']
class DNSResolver:
17class DNSResolver:
18    def __init__(self, no_max_lookups: bool = False) -> None:
19        self.__lookups_counter = 0
20        self.__no_max_lookups = no_max_lookups
21        self.__resolver = aiodns.DNSResolver(loop=asyncio.get_running_loop())
22
23    async def _query(self, name: str, query_type: QueryType) -> typing.Any:
24        self.__lookups_counter += 1
25        if not self.__no_max_lookups and self.__lookups_counter > _MAX_LOOKUPS:
26            raise errors.Permerror("Max DNS lookups exceeded")
27
28        try:
29            return await self.__resolver.query(name, query_type)
30        except aiodns.error.DNSError as dns_error:
31            error_msg = dns_error.args[1]
32            match dns_error.args[0]:
33                case aiodns.error.ARES_ENOTFOUND | aiodns.error.ARES_ENODATA:  # type: ignore
34                    return None
35                case (
36                    aiodns.error.ARES_EBADQUERY  # type: ignore
37                    | aiodns.error.ARES_EFORMERR  # type: ignore
38                    | aiodns.error.ARES_EBADNAME  # type: ignore
39                    | aiodns.error.ARES_EBADRESP  # type: ignore
40                ):
41                    raise errors.Permerror(
42                        f"DNS error (code {dns_error.args[0]}): {error_msg}"
43                    )
44                case _:
45                    raise errors.Temperror(
46                        f"DNS error (code {dns_error.args[0]}): {error_msg}"
47                    )
48
49    async def txt(self, name: str) -> list[pycares.ares_query_txt_result] | None:
50        return await self._query(name, "TXT")
51
52    async def mx(self, name: str) -> list[pycares.ares_query_mx_result] | None:
53        return await self._query(name, "MX")
54
55    async def a(self, name: str) -> list[pycares.ares_query_a_result] | None:
56        return await self._query(name, "A")
57
58    async def aaaa(self, name: str) -> list[pycares.ares_query_aaaa_result] | None:
59        return await self._query(name, "AAAA")
60
61    async def ptr(self, name: str) -> pycares.ares_query_ptr_result | None:
62        return await self._query(name, "PTR")
DNSResolver(no_max_lookups: bool = False)
18    def __init__(self, no_max_lookups: bool = False) -> None:
19        self.__lookups_counter = 0
20        self.__no_max_lookups = no_max_lookups
21        self.__resolver = aiodns.DNSResolver(loop=asyncio.get_running_loop())
async def txt(self, name: str) -> list[pycares.ares_query_txt_result] | None:
49    async def txt(self, name: str) -> list[pycares.ares_query_txt_result] | None:
50        return await self._query(name, "TXT")
async def mx(self, name: str) -> list[pycares.ares_query_mx_result] | None:
52    async def mx(self, name: str) -> list[pycares.ares_query_mx_result] | None:
53        return await self._query(name, "MX")
async def a(self, name: str) -> list[pycares.ares_query_a_result] | None:
55    async def a(self, name: str) -> list[pycares.ares_query_a_result] | None:
56        return await self._query(name, "A")
async def aaaa(self, name: str) -> list[pycares.ares_query_aaaa_result] | None:
58    async def aaaa(self, name: str) -> list[pycares.ares_query_aaaa_result] | None:
59        return await self._query(name, "AAAA")
async def ptr(self, name: str) -> pycares.ares_query_ptr_result | None:
61    async def ptr(self, name: str) -> pycares.ares_query_ptr_result | None:
62        return await self._query(name, "PTR")