Coverage for src / documint_mcp / connectors / notifications.py: 0%
55 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 22:30 -0400
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 22:30 -0400
1"""Event dispatcher that routes lifecycle events to configured connectors.
3Auto-detects available connectors at startup based on environment variables.
4If no connectors are configured the dispatcher is a silent no-op -- zero
5errors, zero overhead.
6"""
8from __future__ import annotations
10import logging
11import os
13from documint_mcp.models import DriftFinding, DocPatch, VerificationRun
15from .base import ConnectorBase
17logger = logging.getLogger(__name__)
20class NotificationDispatcher:
21 """Fan-out dispatcher for notification events.
23 On initialization, probes the environment for configured connectors
24 and lazily imports only the ones that are available. Each event
25 method iterates over the active connectors, catching and logging
26 per-connector errors so that one failure never blocks the others.
27 """
29 def __init__(self) -> None:
30 self._connectors: list[ConnectorBase] = []
31 self._detect_connectors()
33 def _detect_connectors(self) -> None:
34 # -- Slack -----------------------------------------------------------
35 if os.getenv("SLACK_BOT_TOKEN"):
36 try:
37 from .slack import SlackConnector
39 connector = SlackConnector()
40 if connector.is_configured:
41 self._connectors.append(connector)
42 logger.info("NotificationDispatcher: Slack connector enabled")
43 else:
44 logger.info(
45 "NotificationDispatcher: SLACK_BOT_TOKEN set but "
46 "SLACK_CHANNEL missing or slack-sdk not installed; skipping"
47 )
48 except Exception:
49 logger.exception(
50 "NotificationDispatcher: failed to initialize Slack connector"
51 )
53 if not self._connectors:
54 logger.debug(
55 "NotificationDispatcher: no connectors configured; "
56 "notifications will be silent no-ops"
57 )
59 @property
60 def active_connectors(self) -> int:
61 return len(self._connectors)
63 # -- event methods -------------------------------------------------------
65 async def on_drift_detected(
66 self,
67 project_id: str,
68 findings: list[DriftFinding],
69 ) -> None:
70 """Notify all connectors that drift findings have been detected."""
71 if not self._connectors or not findings:
72 return
73 for connector in self._connectors:
74 try:
75 await connector.send_drift_alert(project_id, findings)
76 except Exception:
77 logger.exception(
78 "Connector %s failed on drift alert for %s",
79 type(connector).__name__,
80 project_id,
81 )
83 async def on_patch_drafted(
84 self,
85 project_id: str,
86 patch: DocPatch,
87 ) -> None:
88 """Notify all connectors that a doc patch has been drafted."""
89 if not self._connectors:
90 return
91 for connector in self._connectors:
92 try:
93 await connector.send_patch_preview(project_id, patch)
94 except Exception:
95 logger.exception(
96 "Connector %s failed on patch preview for %s",
97 type(connector).__name__,
98 project_id,
99 )
101 async def on_digest_ready(
102 self,
103 project_id: str,
104 report: VerificationRun,
105 ) -> None:
106 """Notify all connectors with a periodic health digest."""
107 if not self._connectors:
108 return
109 for connector in self._connectors:
110 try:
111 await connector.send_digest(project_id, report)
112 except Exception:
113 logger.exception(
114 "Connector %s failed on digest for %s",
115 type(connector).__name__,
116 project_id,
117 )
120# ---------------------------------------------------------------------------
121# Module-level singleton
122# ---------------------------------------------------------------------------
123_dispatcher: NotificationDispatcher | None = None
126def get_notification_dispatcher() -> NotificationDispatcher:
127 """Return the module-level dispatcher, creating it on first call."""
128 global _dispatcher
129 if _dispatcher is None:
130 _dispatcher = NotificationDispatcher()
131 return _dispatcher