emailsec._dns_resolver
1import asyncio 2import typing 3 4from emailsec import _errors as errors 5 6import aiodns 7import pycares 8 9_MAX_LOOKUPS = 10 10 11QueryType = typing.Literal[ 12 "A", "AAAA", "CAA", "CNAME", "MX", "NAPTR", "NS", "PTR", "SOA", "SRV", "TXT" 13] 14 15 16class DNSResolver: 17 def __init__(self, no_max_lookups: bool = False) -> None: 18 self.__lookups_counter = 0 19 self.__no_max_lookups = no_max_lookups 20 self.__resolver = aiodns.DNSResolver(loop=asyncio.get_running_loop()) 21 22 async def _query(self, name: str, query_type: QueryType) -> typing.Any: 23 self.__lookups_counter += 1 24 if not self.__no_max_lookups and self.__lookups_counter > _MAX_LOOKUPS: 25 raise errors.Permerror("Max DNS lookups exceeded") 26 27 try: 28 return await self.__resolver.query(name, query_type) 29 except aiodns.error.DNSError as dns_error: 30 error_msg = dns_error.args[1] 31 match dns_error.args[0]: 32 case aiodns.error.ARES_ENOTFOUND | aiodns.error.ARES_ENODATA: # type: ignore 33 return None 34 case ( 35 aiodns.error.ARES_EBADQUERY # type: ignore 36 | aiodns.error.ARES_EFORMERR # type: ignore 37 | aiodns.error.ARES_EBADNAME # type: ignore 38 | aiodns.error.ARES_EBADRESP # type: ignore 39 ): 40 raise errors.Permerror( 41 f"DNS error (code {dns_error.args[0]}): {error_msg}" 42 ) 43 case _: 44 raise errors.Temperror( 45 f"DNS error (code {dns_error.args[0]}): {error_msg}" 46 ) 47 48 async def txt(self, name: str) -> list[pycares.ares_query_txt_result] | None: 49 return await self._query(name, "TXT") 50 51 async def mx(self, name: str) -> list[pycares.ares_query_mx_result] | None: 52 return await self._query(name, "MX") 53 54 async def a(self, name: str) -> list[pycares.ares_query_a_result] | None: 55 return await self._query(name, "A") 56 57 async def aaaa(self, name: str) -> list[pycares.ares_query_aaaa_result] | None: 58 return await self._query(name, "AAAA") 59 60 async def ptr(self, name: str) -> pycares.ares_query_ptr_result | None: 61 return await self._query(name, "PTR")
QueryType =
typing.Literal['A', 'AAAA', 'CAA', 'CNAME', 'MX', 'NAPTR', 'NS', 'PTR', 'SOA', 'SRV', 'TXT']
class
DNSResolver:
17class DNSResolver: 18 def __init__(self, no_max_lookups: bool = False) -> None: 19 self.__lookups_counter = 0 20 self.__no_max_lookups = no_max_lookups 21 self.__resolver = aiodns.DNSResolver(loop=asyncio.get_running_loop()) 22 23 async def _query(self, name: str, query_type: QueryType) -> typing.Any: 24 self.__lookups_counter += 1 25 if not self.__no_max_lookups and self.__lookups_counter > _MAX_LOOKUPS: 26 raise errors.Permerror("Max DNS lookups exceeded") 27 28 try: 29 return await self.__resolver.query(name, query_type) 30 except aiodns.error.DNSError as dns_error: 31 error_msg = dns_error.args[1] 32 match dns_error.args[0]: 33 case aiodns.error.ARES_ENOTFOUND | aiodns.error.ARES_ENODATA: # type: ignore 34 return None 35 case ( 36 aiodns.error.ARES_EBADQUERY # type: ignore 37 | aiodns.error.ARES_EFORMERR # type: ignore 38 | aiodns.error.ARES_EBADNAME # type: ignore 39 | aiodns.error.ARES_EBADRESP # type: ignore 40 ): 41 raise errors.Permerror( 42 f"DNS error (code {dns_error.args[0]}): {error_msg}" 43 ) 44 case _: 45 raise errors.Temperror( 46 f"DNS error (code {dns_error.args[0]}): {error_msg}" 47 ) 48 49 async def txt(self, name: str) -> list[pycares.ares_query_txt_result] | None: 50 return await self._query(name, "TXT") 51 52 async def mx(self, name: str) -> list[pycares.ares_query_mx_result] | None: 53 return await self._query(name, "MX") 54 55 async def a(self, name: str) -> list[pycares.ares_query_a_result] | None: 56 return await self._query(name, "A") 57 58 async def aaaa(self, name: str) -> list[pycares.ares_query_aaaa_result] | None: 59 return await self._query(name, "AAAA") 60 61 async def ptr(self, name: str) -> pycares.ares_query_ptr_result | None: 62 return await self._query(name, "PTR")