emailsec._dns_resolver
1import asyncio 2import typing 3 4from emailsec import _errors as errors 5 6import aiodns 7import pycares 8 9_MAX_LOOKUPS = 10 10 11 12class DNSResolver: 13 def __init__(self, no_max_lookups: bool = False) -> None: 14 self.__lookups_counter = 0 15 self.__no_max_lookups = no_max_lookups 16 self.__resolver = aiodns.DNSResolver(loop=asyncio.get_running_loop()) 17 18 async def _query(self, name: str, query_type: str) -> typing.Any: 19 self.__lookups_counter += 1 20 if not self.__no_max_lookups and self.__lookups_counter > _MAX_LOOKUPS: 21 raise errors.Permerror("Max DNS lookups exceeded") 22 23 try: 24 return await self.__resolver.query(name, query_type) 25 except aiodns.error.DNSError as dns_error: 26 error_msg = dns_error.args[1] 27 match dns_error.args[0]: 28 case aiodns.error.ARES_ENOTFOUND | aiodns.error.ARES_ENODATA: # type: ignore 29 return None 30 case ( 31 aiodns.error.ARES_EBADQUERY # type: ignore 32 | aiodns.error.ARES_EFORMERR # type: ignore 33 | aiodns.error.ARES_EBADNAME # type: ignore 34 | aiodns.error.ARES_EBADRESP # type: ignore 35 ): 36 raise errors.Permerror( 37 f"DNS error (code {dns_error.args[0]}): {error_msg}" 38 ) 39 case _: 40 raise errors.Temperror( 41 f"DNS error (code {dns_error.args[0]}): {error_msg}" 42 ) 43 44 async def txt(self, name: str) -> list[pycares.ares_query_txt_result] | None: 45 return await self._query(name, "TXT") 46 47 async def mx(self, name: str) -> list[pycares.ares_query_mx_result] | None: 48 return await self._query(name, "MX") 49 50 async def a(self, name: str) -> list[pycares.ares_query_a_result] | None: 51 return await self._query(name, "A") 52 53 async def aaaa(self, name: str) -> list[pycares.ares_query_aaaa_result] | None: 54 return await self._query(name, "AAAA") 55 56 async def ptr(self, name: str) -> pycares.ares_query_ptr_result | None: 57 return await self._query(name, "PTR")
class
DNSResolver:
13class DNSResolver: 14 def __init__(self, no_max_lookups: bool = False) -> None: 15 self.__lookups_counter = 0 16 self.__no_max_lookups = no_max_lookups 17 self.__resolver = aiodns.DNSResolver(loop=asyncio.get_running_loop()) 18 19 async def _query(self, name: str, query_type: str) -> typing.Any: 20 self.__lookups_counter += 1 21 if not self.__no_max_lookups and self.__lookups_counter > _MAX_LOOKUPS: 22 raise errors.Permerror("Max DNS lookups exceeded") 23 24 try: 25 return await self.__resolver.query(name, query_type) 26 except aiodns.error.DNSError as dns_error: 27 error_msg = dns_error.args[1] 28 match dns_error.args[0]: 29 case aiodns.error.ARES_ENOTFOUND | aiodns.error.ARES_ENODATA: # type: ignore 30 return None 31 case ( 32 aiodns.error.ARES_EBADQUERY # type: ignore 33 | aiodns.error.ARES_EFORMERR # type: ignore 34 | aiodns.error.ARES_EBADNAME # type: ignore 35 | aiodns.error.ARES_EBADRESP # type: ignore 36 ): 37 raise errors.Permerror( 38 f"DNS error (code {dns_error.args[0]}): {error_msg}" 39 ) 40 case _: 41 raise errors.Temperror( 42 f"DNS error (code {dns_error.args[0]}): {error_msg}" 43 ) 44 45 async def txt(self, name: str) -> list[pycares.ares_query_txt_result] | None: 46 return await self._query(name, "TXT") 47 48 async def mx(self, name: str) -> list[pycares.ares_query_mx_result] | None: 49 return await self._query(name, "MX") 50 51 async def a(self, name: str) -> list[pycares.ares_query_a_result] | None: 52 return await self._query(name, "A") 53 54 async def aaaa(self, name: str) -> list[pycares.ares_query_aaaa_result] | None: 55 return await self._query(name, "AAAA") 56 57 async def ptr(self, name: str) -> pycares.ares_query_ptr_result | None: 58 return await self._query(name, "PTR")