emailsec.dkim

1from .checker import check_dkim, DKIMCheck, DKIMResult
2from .parser import DKIMSignature
3
4__all__ = [
5    "check_dkim",
6    "DKIMCheck",
7    "DKIMResult",
8    "DKIMSignature",
9]
async def check_dkim( message: bytes, body_and_headers: BodyAndHeaders | None = None) -> DKIMCheck:
44async def check_dkim(
45    message: bytes, body_and_headers: emailsec._utils.BodyAndHeaders | None = None
46) -> DKIMCheck:
47    if body_and_headers:
48        body, headers = body_and_headers
49    else:
50        body, headers = body_and_headers_for_canonicalization(message)
51
52    signatures = []
53    for header_name, raw_signature in headers.get("dkim-signature", []):
54        try:
55            sig = parse_dkim_header_field(raw_signature.decode())
56        except ValueError:
57            continue
58
59        signatures.append(((header_name, raw_signature), sig))
60
61    if not signatures:
62        return DKIMCheck(result=DKIMResult.PERMFAIL)
63
64    # Try to pick an aligned signature if multiple signatures are present
65    def _sort_sig(item: tuple[emailsec._utils.Header, DKIMSignature]) -> bool:
66        _, s = item
67        _, from_addr = email.utils.parseaddr(headers["from"][0][1].decode().strip())
68        rfc5322_from = from_addr.partition("@")[-1]
69        return is_dkim_aligned(s["d"], rfc5322_from)
70
71    # Verify the top 5 signatures and stop once one verifies successfully
72    for sig_header, parsed_sig in sorted(signatures, key=_sort_sig, reverse=True)[:5]:
73        try:
74            if await _verify_dkim_signature(
75                body, headers, sig_header, typing.cast(_DKIMStyleSig, parsed_sig)
76            ):
77                return DKIMCheck(
78                    result=DKIMResult.SUCCESS,
79                    domain=parsed_sig["d"],
80                    selector=parsed_sig["s"],
81                    signature=parsed_sig,
82                )
83        except errors.Temperror:
84            return DKIMCheck(
85                result=DKIMResult.TEMPFAIL,
86                domain=parsed_sig["d"],
87                selector=parsed_sig["s"],
88                signature=parsed_sig,
89            )
90        except errors.Permerror:
91            continue
92
93    return DKIMCheck(result=DKIMResult.PERMFAIL)
@dataclass
class DKIMCheck:
36@dataclass
37class DKIMCheck:
38    result: DKIMResult
39    domain: str | None = None
40    selector: str | None = None
41    signature: DKIMSignature | None = None
DKIMCheck( result: DKIMResult, domain: str | None = None, selector: str | None = None, signature: DKIMSignature | None = None)
result: DKIMResult
domain: str | None = None
selector: str | None = None
signature: DKIMSignature | None = None
class DKIMResult(enum.StrEnum):
30class DKIMResult(StrEnum):
31    SUCCESS = "SUCCESS"
32    PERMFAIL = "PERMFAIL"
33    TEMPFAIL = "TEMPFAIL"
SUCCESS = <DKIMResult.SUCCESS: 'SUCCESS'>
PERMFAIL = <DKIMResult.PERMFAIL: 'PERMFAIL'>
TEMPFAIL = <DKIMResult.TEMPFAIL: 'TEMPFAIL'>
class DKIMSignature(typing.TypedDict):
65class DKIMSignature(typing.TypedDict):
66    v: str
67    a: str
68    b: str
69    bh: str
70    c: typing.NotRequired[str]
71    d: str
72    h: str
73    i: typing.NotRequired[str]
74    l: typing.NotRequired[int]  # noqa: E741
75    q: typing.NotRequired[str]
76    s: str
77    t: typing.NotRequired[int]
78    x: typing.NotRequired[int]
79    z: typing.NotRequired[str]
v: str
a: str
b: str
bh: str
c: NotRequired[str]
d: str
h: str
i: NotRequired[str]
l: NotRequired[int]
q: NotRequired[str]
s: str
t: NotRequired[int]
x: NotRequired[int]
z: NotRequired[str]