sentineliqsdk

Public package entry points for SentinelIQ SDK.

Exports the modern API and a convenience runner for workers.

 1"""Public package entry points for SentinelIQ SDK.
 2
 3Exports the modern API and a convenience runner for workers.
 4"""
 5
 6from __future__ import annotations
 7
 8from typing import Protocol, TypeVar
 9
10from sentineliqsdk.analyzers import Analyzer
11from sentineliqsdk.core import Worker
12from sentineliqsdk.extractors import Extractor
13from sentineliqsdk.responders import Responder
14
15__all__ = ["Analyzer", "Extractor", "Responder", "Worker", "runner"]
16
17
18class Runnable(Protocol):
19    """Protocol for runnable workers exposing a ``run()`` method."""
20
21    def run(self) -> None:  # pragma: no cover - typing-only contract
22        ...
23
24
25T = TypeVar("T", bound=Runnable)
26
27
28def runner(worker_cls: type[T]) -> None:
29    """Instantiate and run a worker class with a ``run()`` method."""
30    worker: Runnable = worker_cls()  # type: ignore[call-arg]
31    worker.run()
class Analyzer(sentineliqsdk.Worker):
 18class Analyzer(Worker):
 19    """Base class for analyzers with auto-extraction and helpers."""
 20
 21    def __init__(self, job_directory: str | None = None, secret_phrases=None) -> None:
 22        super().__init__(job_directory, secret_phrases)
 23        self.auto_extract: bool = self.get_param("config.auto_extract", True)
 24
 25    def get_data(self) -> Any:
 26        """Return the observable value or filename for `file` datatypes."""
 27        if self.data_type == "file":
 28            return self.get_param("filename", None, "Missing filename.")
 29        return self.get_param("data", None, "Missing data field")
 30
 31    def get_param(self, name: str, default: Any | None = None, message: str | None = None) -> Any:
 32        """Resolve dotted name; special-case `file`/`filename` for job-dir absolute path.
 33
 34        - When `dataType == "file"` and running in job-directory mode, `get_param("file")`
 35          maps to the underlying `filename` value and, if the file exists under
 36          `<job_dir>/input/`, returns its absolute path. If not found, returns the raw value.
 37        - `get_param("filename")` behaves the same in this context.
 38        """
 39        # Determine the base key to fetch if the accessor is the logical "file" alias.
 40        base_key = "filename" if (name == "file") else name
 41        data = super().get_param(base_key, default, message)
 42        if (
 43            base_key in {"file", "filename"}
 44            and self.data_type == "file"
 45            and self.job_directory is not None
 46            and isinstance(data, str)
 47        ):
 48            path = f"{self.job_directory}/input/{data}"
 49            if os.path.isfile(path):
 50                return path
 51        return data
 52
 53    def build_taxonomy(
 54        self, level: TaxonomyLevel, namespace: str, predicate: str, value: str
 55    ) -> dict:
 56        """Create a normalized taxonomy entry for report metadata."""
 57        if level not in ("info", "safe", "suspicious", "malicious"):
 58            level = "info"
 59        return {
 60            "level": level,
 61            "namespace": namespace,
 62            "predicate": predicate,
 63            "value": value,
 64        }
 65
 66    def summary(self, raw: Any) -> dict:
 67        """Return analyzer-specific short summary (optional)."""
 68        return {}
 69
 70    def artifacts(self, raw: Any) -> list[dict]:
 71        """Auto-extract IOCs from the full report when enabled."""
 72        if self.auto_extract:
 73            extractor = Extractor(ignore=self.get_data())
 74            return extractor.check_iterable(raw)
 75        return []
 76
 77    def _copy_file_to_output(self, src_path: str) -> dict | None:
 78        """Copy a source file into `<job_dir>/output/` and return file metadata.
 79
 80        When running in STDIN mode (`job_directory is None`), this returns None to
 81        avoid creating an `output/` directory in the current working directory.
 82        """
 83        if self.job_directory is None:
 84            return None
 85        if not os.path.isfile(src_path):
 86            return None
 87        output_dir = os.path.join(self.job_directory or "", "output")
 88        os.makedirs(output_dir, exist_ok=True)
 89        with tempfile.NamedTemporaryFile(dir=output_dir, delete=False) as dst:
 90            with open(src_path, "rb") as src:
 91                copyfileobj(src, dst)
 92            dstfname = dst.name
 93        os.chmod(dstfname, 0o444)
 94        return {"file": os.path.basename(dstfname), "filename": os.path.basename(src_path)}
 95
 96    def build_artifact(self, data_type: str, data: Any, **kwargs: Any) -> dict | None:
 97        """Build an artifact dict; copy files to output for `file` type.
 98
 99        In STDIN mode (`job_directory is None`), `file` artifacts are skipped
100        and this returns None.
101        """
102        if data_type == "file":
103            file_fields = self._copy_file_to_output(str(data))
104            if file_fields is None:
105                return None
106            out = {"dataType": data_type, **file_fields, **kwargs}
107            return out
108        return {"dataType": data_type, "data": data, **kwargs}
109
110    def report(self, full_report: dict, ensure_ascii: bool = False) -> None:
111        """Wrap full report with SDK envelope and write JSON output."""
112        summary: dict = {}
113        with suppress(Exception):
114            summary = self.summary(full_report)
115        operation_list: list[dict] = []
116        with suppress(Exception):
117            operation_list = self.operations(full_report)
118        super().report(
119            {
120                "success": True,
121                "summary": summary,
122                "artifacts": self.artifacts(full_report),
123                "operations": operation_list,
124                "full": full_report,
125            },
126            ensure_ascii,
127        )
128
129    def run(self) -> None:  # pragma: no cover - to be overridden
130        """Override in subclasses."""

Base class for analyzers with auto-extraction and helpers.

Analyzer(job_directory: str | None = None, secret_phrases=None)
21    def __init__(self, job_directory: str | None = None, secret_phrases=None) -> None:
22        super().__init__(job_directory, secret_phrases)
23        self.auto_extract: bool = self.get_param("config.auto_extract", True)
auto_extract: bool
def get_data(self) -> Any:
25    def get_data(self) -> Any:
26        """Return the observable value or filename for `file` datatypes."""
27        if self.data_type == "file":
28            return self.get_param("filename", None, "Missing filename.")
29        return self.get_param("data", None, "Missing data field")

Return the observable value or filename for file datatypes.

def get_param( self, name: str, default: typing.Any | None = None, message: str | None = None) -> Any:
31    def get_param(self, name: str, default: Any | None = None, message: str | None = None) -> Any:
32        """Resolve dotted name; special-case `file`/`filename` for job-dir absolute path.
33
34        - When `dataType == "file"` and running in job-directory mode, `get_param("file")`
35          maps to the underlying `filename` value and, if the file exists under
36          `<job_dir>/input/`, returns its absolute path. If not found, returns the raw value.
37        - `get_param("filename")` behaves the same in this context.
38        """
39        # Determine the base key to fetch if the accessor is the logical "file" alias.
40        base_key = "filename" if (name == "file") else name
41        data = super().get_param(base_key, default, message)
42        if (
43            base_key in {"file", "filename"}
44            and self.data_type == "file"
45            and self.job_directory is not None
46            and isinstance(data, str)
47        ):
48            path = f"{self.job_directory}/input/{data}"
49            if os.path.isfile(path):
50                return path
51        return data

Resolve dotted name; special-case file/filename for job-dir absolute path.

  • When dataType == "file" and running in job-directory mode, get_param("file") maps to the underlying filename value and, if the file exists under <job_dir>/input/, returns its absolute path. If not found, returns the raw value.
  • get_param("filename") behaves the same in this context.
def build_taxonomy( self, level: Literal['info', 'safe', 'suspicious', 'malicious'], namespace: str, predicate: str, value: str) -> dict:
53    def build_taxonomy(
54        self, level: TaxonomyLevel, namespace: str, predicate: str, value: str
55    ) -> dict:
56        """Create a normalized taxonomy entry for report metadata."""
57        if level not in ("info", "safe", "suspicious", "malicious"):
58            level = "info"
59        return {
60            "level": level,
61            "namespace": namespace,
62            "predicate": predicate,
63            "value": value,
64        }

Create a normalized taxonomy entry for report metadata.

def summary(self, raw: Any) -> dict:
66    def summary(self, raw: Any) -> dict:
67        """Return analyzer-specific short summary (optional)."""
68        return {}

Return analyzer-specific short summary (optional).

def artifacts(self, raw: Any) -> list[dict]:
70    def artifacts(self, raw: Any) -> list[dict]:
71        """Auto-extract IOCs from the full report when enabled."""
72        if self.auto_extract:
73            extractor = Extractor(ignore=self.get_data())
74            return extractor.check_iterable(raw)
75        return []

Auto-extract IOCs from the full report when enabled.

def build_artifact(self, data_type: str, data: Any, **kwargs: Any) -> dict | None:
 96    def build_artifact(self, data_type: str, data: Any, **kwargs: Any) -> dict | None:
 97        """Build an artifact dict; copy files to output for `file` type.
 98
 99        In STDIN mode (`job_directory is None`), `file` artifacts are skipped
100        and this returns None.
101        """
102        if data_type == "file":
103            file_fields = self._copy_file_to_output(str(data))
104            if file_fields is None:
105                return None
106            out = {"dataType": data_type, **file_fields, **kwargs}
107            return out
108        return {"dataType": data_type, "data": data, **kwargs}

Build an artifact dict; copy files to output for file type.

In STDIN mode (job_directory is None), file artifacts are skipped and this returns None.

def report(self, full_report: dict, ensure_ascii: bool = False) -> None:
110    def report(self, full_report: dict, ensure_ascii: bool = False) -> None:
111        """Wrap full report with SDK envelope and write JSON output."""
112        summary: dict = {}
113        with suppress(Exception):
114            summary = self.summary(full_report)
115        operation_list: list[dict] = []
116        with suppress(Exception):
117            operation_list = self.operations(full_report)
118        super().report(
119            {
120                "success": True,
121                "summary": summary,
122                "artifacts": self.artifacts(full_report),
123                "operations": operation_list,
124                "full": full_report,
125            },
126            ensure_ascii,
127        )

Wrap full report with SDK envelope and write JSON output.

def run(self) -> None:
129    def run(self) -> None:  # pragma: no cover - to be overridden
130        """Override in subclasses."""

Override in subclasses.

class Extractor:
 27class Extractor:
 28    """Detect IOC attribute types using stdlib-backed heuristics.
 29
 30    Two functions are provided:
 31      - ``check_string(str)`` which checks a string and returns the type.
 32      - ``check_iterable(itr)`` that iterates over a list or a dictionary and returns a
 33        list of {type, value} dicts.
 34
 35    Note: not a full-text search; IOC values must appear as isolated strings.
 36
 37    :param ignore: String to ignore when matching artifacts to type
 38    """
 39
 40    def __init__(self, ignore: str | None = None):
 41        self.ignore = ignore
 42        # Small per-instance cache to avoid repeated classification work.
 43        self._cache: dict[tuple[str | None, str], str] = {}
 44
 45    # --- Type checks ---
 46    @staticmethod
 47    def _is_ip(value: str) -> bool:
 48        try:
 49            ipaddress.ip_address(value)
 50            return True
 51        except ValueError:
 52            return False
 53
 54    @staticmethod
 55    def _is_url(value: str) -> bool:
 56        if not value.startswith(("http://", "https://")):
 57            return False
 58        parsed = urlparse(value)
 59        return bool(parsed.scheme in {"http", "https"} and parsed.netloc)
 60
 61    @staticmethod
 62    def _label_allowed(label: str) -> bool:
 63        allowed = set(string.ascii_letters + string.digits + "_-")
 64        return bool(label) and all(c in allowed for c in label)
 65
 66    @classmethod
 67    def _is_domain(cls, value: str) -> bool:
 68        if value.startswith(("http://", "https://")):
 69            return False
 70        parts = value.split(".")
 71        if len(parts) != DOMAIN_PARTS:
 72            return False
 73        left, tld = parts
 74        return cls._label_allowed(left) and tld.isalpha()
 75
 76    @staticmethod
 77    def _is_hash(value: str) -> bool:
 78        if len(value) not in {32, 40, 64}:
 79            return False
 80        hexd = set(string.hexdigits)
 81        return all(c in hexd for c in value)
 82
 83    @staticmethod
 84    def _is_user_agent(value: str) -> bool:
 85        return value.startswith(("Mozilla/4.0 ", "Mozilla/5.0 "))
 86
 87    @staticmethod
 88    def _is_uri_path(value: str) -> bool:
 89        if value.startswith(("http://", "https://")):
 90            return False
 91        parsed = urlparse(value)
 92        return bool(parsed.scheme and "://" in value)
 93
 94    @staticmethod
 95    def _is_registry(value: str) -> bool:
 96        prefixes = ("HKEY", "HKLM", "HKCU", "HKCR", "HKCC")
 97        if not value.startswith(prefixes):
 98            return False
 99        return "\\" in value
100
101    @staticmethod
102    def _is_mail(value: str) -> bool:
103        name, addr = parseaddr(value)
104        if addr != value:
105            return False
106        if "@" not in addr:
107            return False
108        local, _, domain = addr.partition("@")
109        return bool(local and domain and "." in domain)
110
111    @classmethod
112    def _is_fqdn(cls, value: str) -> bool:
113        if value.startswith(("http://", "https://")):
114            return False
115        parts = value.split(".")
116        if len(parts) < MIN_FQDN_LABELS:
117            return False
118        *labels, tld = parts
119        return all(cls._label_allowed(lbl) for lbl in labels) and tld.isalpha()
120
121    def __checktype(self, value: Any) -> str:
122        """Check if the given value is a known datatype.
123
124        :param value: The value to check
125        :type value: str or number
126        :return: Data type of value, if known, else empty string
127        :rtype: str
128        """
129        if self.ignore:
130            # Ignore only exact matches to avoid hiding valid IOCs that merely
131            # contain the observable as a substring.
132            if isinstance(value, str) and self.ignore == value:
133                return ""
134
135        if isinstance(value, str):
136            key = (self.ignore, value)
137            if key in self._cache:
138                return self._cache[key]
139
140            dtype = ""
141            checks: list[tuple[Callable[[str], bool], str]] = [
142                (self._is_ip, "ip"),
143                (self._is_url, "url"),
144                (self._is_domain, "domain"),
145                (self._is_hash, "hash"),
146                (self._is_user_agent, "user-agent"),
147                (self._is_uri_path, "uri_path"),
148                (self._is_registry, "registry"),
149                (self._is_mail, "mail"),
150                (self._is_fqdn, "fqdn"),
151            ]
152            for predicate, dtype_name in checks:
153                if predicate(value):
154                    dtype = dtype_name
155                    break
156
157            self._cache[key] = dtype
158            return dtype
159        return ""
160
161    def check_string(self, value: str) -> str:
162        """Check if a string matches a datatype.
163
164        :param value: String to test
165        :type value: str
166        :return: Data type or empty string
167        :rtype: str
168        """
169        return self.__checktype(value)
170
171    def check_iterable(self, iterable: Any) -> list[dict[str, str]]:
172        """Check values of a list or a dict for IOCs.
173
174        Returns a list of dict {type, value}. Raises TypeError if iterable is not an
175        expected type.
176
177        :param iterable: List or dict of values
178        :type iterable: list | dict | str
179        :return: List of IOCs matching the regex
180        :rtype: list
181        """
182        results: list[dict[str, str]] = []
183
184        if not isinstance(iterable, str | list | dict):
185            raise TypeError("Not supported type.")
186
187        stack: list[Any] = [iterable]
188        while stack:
189            item = stack.pop()
190            if isinstance(item, dict):
191                stack.extend(item.values())
192            elif isinstance(item, list):
193                stack.extend(item)
194            elif isinstance(item, str):
195                dt = self.__checktype(item)
196                if dt:
197                    results.append({"dataType": dt, "data": item})
198
199        return self.deduplicate(results)
200
201    @staticmethod
202    def deduplicate(list_of_objects: list[dict[str, str]]) -> list[dict[str, str]]:
203        """Deduplicate list of IOC objects by type + data in O(n)."""
204        seen: set[tuple[str, str]] = set()
205        dedup_list: list[dict[str, str]] = []
206        for obj in list_of_objects:
207            key = (obj["dataType"], obj["data"])
208            if key in seen:
209                continue
210            seen.add(key)
211            dedup_list.append(obj)
212        return dedup_list

Detect IOC attribute types using stdlib-backed heuristics.

Two functions are provided:

  • check_string(str) which checks a string and returns the type.
  • check_iterable(itr) that iterates over a list or a dictionary and returns a list of {type, value} dicts.

Note: not a full-text search; IOC values must appear as isolated strings.

Parameters
  • ignore: String to ignore when matching artifacts to type
Extractor(ignore: str | None = None)
40    def __init__(self, ignore: str | None = None):
41        self.ignore = ignore
42        # Small per-instance cache to avoid repeated classification work.
43        self._cache: dict[tuple[str | None, str], str] = {}
ignore
def check_string(self, value: str) -> str:
161    def check_string(self, value: str) -> str:
162        """Check if a string matches a datatype.
163
164        :param value: String to test
165        :type value: str
166        :return: Data type or empty string
167        :rtype: str
168        """
169        return self.__checktype(value)

Check if a string matches a datatype.

Parameters
  • value: String to test
Returns

Data type or empty string

def check_iterable(self, iterable: Any) -> list[dict[str, str]]:
171    def check_iterable(self, iterable: Any) -> list[dict[str, str]]:
172        """Check values of a list or a dict for IOCs.
173
174        Returns a list of dict {type, value}. Raises TypeError if iterable is not an
175        expected type.
176
177        :param iterable: List or dict of values
178        :type iterable: list | dict | str
179        :return: List of IOCs matching the regex
180        :rtype: list
181        """
182        results: list[dict[str, str]] = []
183
184        if not isinstance(iterable, str | list | dict):
185            raise TypeError("Not supported type.")
186
187        stack: list[Any] = [iterable]
188        while stack:
189            item = stack.pop()
190            if isinstance(item, dict):
191                stack.extend(item.values())
192            elif isinstance(item, list):
193                stack.extend(item)
194            elif isinstance(item, str):
195                dt = self.__checktype(item)
196                if dt:
197                    results.append({"dataType": dt, "data": item})
198
199        return self.deduplicate(results)

Check values of a list or a dict for IOCs.

Returns a list of dict {type, value}. Raises TypeError if iterable is not an expected type.

Parameters
  • iterable: List or dict of values
Returns

List of IOCs matching the regex

@staticmethod
def deduplicate(list_of_objects: list[dict[str, str]]) -> list[dict[str, str]]:
201    @staticmethod
202    def deduplicate(list_of_objects: list[dict[str, str]]) -> list[dict[str, str]]:
203        """Deduplicate list of IOC objects by type + data in O(n)."""
204        seen: set[tuple[str, str]] = set()
205        dedup_list: list[dict[str, str]] = []
206        for obj in list_of_objects:
207            key = (obj["dataType"], obj["data"])
208            if key in seen:
209                continue
210            seen.add(key)
211            dedup_list.append(obj)
212        return dedup_list

Deduplicate list of IOC objects by type + data in O(n).

class Responder(sentineliqsdk.Worker):
11class Responder(Worker):
12    """Base class for responders."""
13
14    def __init__(self, job_directory: str | None = None, secret_phrases=None):
15        super().__init__(job_directory, secret_phrases)
16
17    def get_data(self):
18        """Return data from input dict.
19
20        :return: Data (observable value) given through Cortex
21        """
22        return self.get_param("data", None, "Missing data field")
23
24    def report(self, full_report, ensure_ascii: bool = False):
25        """Return a JSON dict via stdout.
26
27        :param full_report: Responsder results as dict.
28        :param ensure_ascii: Force ascii output. Default: False
29        """
30        operation_list = []
31        with suppress(Exception):
32            operation_list = self.operations(full_report)
33        super().report(
34            {"success": True, "full": full_report, "operations": operation_list},
35            ensure_ascii,
36        )
37
38    def run(self):
39        """Overwritten by responders."""

Base class for responders.

Responder(job_directory: str | None = None, secret_phrases=None)
14    def __init__(self, job_directory: str | None = None, secret_phrases=None):
15        super().__init__(job_directory, secret_phrases)
def get_data(self):
17    def get_data(self):
18        """Return data from input dict.
19
20        :return: Data (observable value) given through Cortex
21        """
22        return self.get_param("data", None, "Missing data field")

Return data from input dict.

Returns

Data (observable value) given through Cortex

def report(self, full_report, ensure_ascii: bool = False):
24    def report(self, full_report, ensure_ascii: bool = False):
25        """Return a JSON dict via stdout.
26
27        :param full_report: Responsder results as dict.
28        :param ensure_ascii: Force ascii output. Default: False
29        """
30        operation_list = []
31        with suppress(Exception):
32            operation_list = self.operations(full_report)
33        super().report(
34            {"success": True, "full": full_report, "operations": operation_list},
35            ensure_ascii,
36        )

Return a JSON dict via stdout.

Parameters
  • full_report: Responsder results as dict.
  • ensure_ascii: Force ascii output. Default: False
def run(self):
38    def run(self):
39        """Overwritten by responders."""

Overwritten by responders.

class Worker(abc.ABC):
 31class Worker(ABC):
 32    """Common functionality for analyzers and responders."""
 33
 34    READ_TIMEOUT = 3  # seconds
 35
 36    def __init__(
 37        self,
 38        job_directory: str | None = None,
 39        secret_phrases: tuple[str, ...] | None = None,
 40    ) -> None:
 41        # Compute initial job directory path or default; may switch to None for STDIN mode later
 42        initial_job_dir = (
 43            job_directory
 44            if job_directory is not None
 45            else (sys.argv[1] if len(sys.argv) > 1 else "/job")
 46        )
 47        self.job_directory: str | None = initial_job_dir
 48        self.secret_phrases = DEFAULT_SECRET_PHRASES if secret_phrases is None else secret_phrases
 49        # Load input
 50        self._input: dict[str, Any] = {}
 51        input_path = f"{self.job_directory}/input/input.json"
 52        if os.path.isfile(input_path):
 53            with open(input_path) as f_input:
 54                self._input = json.load(f_input)
 55        else:
 56            # If input file doesn't exist, read input from STDIN (with timeout)
 57            self.job_directory = None
 58            self.__set_encoding()
 59            is_tty = True
 60            with suppress(Exception):
 61                is_tty = bool(getattr(sys.stdin, "isatty", lambda: True)())
 62            if not is_tty:
 63                # Try a non-blocking readiness check where supported; fall back gracefully
 64                try:
 65                    fileno = sys.stdin.fileno()  # type: ignore[attr-defined]
 66                except Exception:
 67                    # e.g., StringIO without fileno(): try reading directly
 68                    try:
 69                        self._input = json.load(sys.stdin)
 70                    except Exception:
 71                        self.error(f"No input: missing '{input_path}' and STDIN.")
 72                else:
 73                    try:
 74                        rlist, _, _ = select.select([sys.stdin], [], [], self.READ_TIMEOUT)
 75                        if rlist:
 76                            self._input = json.load(sys.stdin)
 77                        else:
 78                            self.error(f"No input: missing '{input_path}' and STDIN.")
 79                    except Exception:
 80                        # If select is unsupported, attempt direct read
 81                        try:
 82                            self._input = json.load(sys.stdin)
 83                        except Exception:
 84                            self.error(f"No input: missing '{input_path}' and STDIN.")
 85            else:
 86                self.error(f"No input: missing '{input_path}' and STDIN.")
 87
 88        # Set parameters
 89        self.data_type = self.get_param("dataType", None, "Missing dataType field")
 90        self.tlp = self.get_param("tlp", 2)
 91        self.pap = self.get_param("pap", 2)
 92
 93        self.enable_check_tlp = self.get_param("config.check_tlp", False)
 94        self.max_tlp = self.get_param("config.max_tlp", 2)
 95
 96        self.enable_check_pap = self.get_param("config.check_pap", False)
 97        self.max_pap = self.get_param("config.max_pap", 2)
 98
 99        # Set proxy configuration if available
100        self.http_proxy = self.get_param("config.proxy.http")
101        self.https_proxy = self.get_param("config.proxy.https")
102
103        self.__set_proxies()
104
105        # Finally run check tlp/pap
106        if not (self.__check_tlp()):
107            self.error("TLP is higher than allowed.")
108
109        if not (self.__check_pap()):
110            self.error("PAP is higher than allowed.")
111
112    def __set_proxies(self) -> None:
113        EnvProxyConfigurator().set_environ(self.http_proxy, self.https_proxy)
114
115    @staticmethod
116    def __set_encoding() -> None:
117        """Ensure stdout/stderr use UTF-8 writers when not already UTF-8."""
118        with suppress(Exception):
119            ensure_utf8_streams()
120
121    def __get_param(
122        self,
123        source: Mapping[str, Any],
124        name: str | list[str],
125        default: Any | None = None,
126        message: str | None = None,
127    ) -> Any:
128        """Extract a specific parameter from given source.
129
130        :param source: Python dict to search through
131        :param name: Name of the parameter to get. JSON-like syntax,
132                     e.g. `config.username` at first, but in recursive calls a list
133        :param default: Default value, if not found. Default: None
134        :param message: Error message. If given and name not found, exit with error.
135                        Default: None
136        """
137        if isinstance(name, str):
138            name = name.split(".")
139
140        if len(name) == 0:
141            # The name is empty, return the source content
142            return source
143        new_source = source.get(name[0])
144        if new_source is not None:
145            return self.__get_param(new_source, name[1:], default, message)
146        if message is not None:
147            self.error(message)
148        return default
149
150    def __check_tlp(self) -> bool:
151        """Check if TLP is within allowed range; return False if too high."""
152        return not (self.enable_check_tlp and self.tlp > self.max_tlp)
153
154    def __check_pap(self) -> bool:
155        """Check if PAP is within allowed range; return False if too high."""
156        return not (self.enable_check_pap and self.pap > self.max_pap)
157
158    def __write_output(self, data: dict[str, Any], ensure_ascii: bool = False) -> None:
159        JsonOutputWriter().write(data, self.job_directory, ensure_ascii=ensure_ascii)
160
161    def get_data(self) -> Any:
162        """Return data from input dict.
163
164        :return: Data (observable value) given through Cortex
165        """
166        return self.get_param("data", None, "Missing data field")
167
168    @staticmethod
169    def build_operation(op_type: str, **parameters: Any) -> dict[str, Any]:
170        """
171        Build an operation descriptor.
172
173        :param op_type: an operation type as a string
174        :param parameters: a dict including the operation's params
175        :return: dict
176        """
177        operation = {"type": op_type}
178        operation.update(parameters)
179
180        return operation
181
182    def operations(self, raw: Any) -> list[dict[str, Any]]:
183        """Return the list of operations to execute after the job completes.
184
185        :returns: by default return an empty array
186        """
187        return []
188
189    def get_param(self, name: str, default: Any | None = None, message: str | None = None) -> Any:
190        """Dotted access into the input JSON; errors when `message` is provided."""
191        return self.__get_param(self._input, name, default, message)
192
193    def get_env(self, key: str, default: Any | None = None, message: str | None = None) -> Any:
194        """
195        Wrap access to configuration values from the environment.
196
197        :param key: Key of the environment variable to get.
198        :param default: Default value, if not found. Default: None
199        :param message: Error message. If given and key not found, exit with error.
200                        Default: None
201        """
202        if key in os.environ:
203            return os.environ[key]
204        if message is not None:
205            self.error(message)
206        return default
207
208    def error(self, message: str, ensure_ascii: bool = False) -> NoReturn:
209        """
210        Stop analyzer with an error message.
211
212        Changing ensure_ascii can be helpful when stucking with ascii <-> utf-8 issues.
213        Additionally, the input as returned, too.
214        Maybe helpful when dealing with errors.
215
216        :param message: Error message
217        :param ensure_ascii: Force ascii output. Default: False
218        """
219        # Get analyzer input
220        analyzer_input = self._input
221
222        # Loop over all the sensitive config names and clean them
223        analyzer_input["config"] = sanitize_config(
224            analyzer_input.get("config", {}), self.secret_phrases
225        )
226
227        self.__write_output(
228            {"success": False, "input": analyzer_input, "errorMessage": message},
229            ensure_ascii=ensure_ascii,
230        )
231
232        # Force exit after error
233        sys.exit(1)
234
235    def summary(self, raw: Any) -> dict[str, Any]:
236        """Return a summary for 'short.html' template.
237
238        Overwrite it for your needs!
239
240        :returns: by default return an empty dict
241        """
242        return {}
243
244    def artifacts(self, raw: Any) -> list[dict[str, Any]]:
245        """Return a list of artifacts (empty by default)."""
246        return []
247
248    def report(self, output: dict[str, Any], ensure_ascii: bool = False) -> None:
249        """Return a JSON dict via stdout.
250
251        :param output: worker output.
252        :param ensure_ascii: Force ascii output. Default: False
253        """
254        self.__write_output(output, ensure_ascii=ensure_ascii)
255
256    @abstractmethod
257    def run(self) -> None:
258        """Entry point to implement in subclasses."""

Common functionality for analyzers and responders.

READ_TIMEOUT = 3
job_directory: str | None
secret_phrases
data_type
tlp
pap
enable_check_tlp
max_tlp
enable_check_pap
max_pap
http_proxy
https_proxy
def get_data(self) -> Any:
161    def get_data(self) -> Any:
162        """Return data from input dict.
163
164        :return: Data (observable value) given through Cortex
165        """
166        return self.get_param("data", None, "Missing data field")

Return data from input dict.

Returns

Data (observable value) given through Cortex

@staticmethod
def build_operation(op_type: str, **parameters: Any) -> dict[str, typing.Any]:
168    @staticmethod
169    def build_operation(op_type: str, **parameters: Any) -> dict[str, Any]:
170        """
171        Build an operation descriptor.
172
173        :param op_type: an operation type as a string
174        :param parameters: a dict including the operation's params
175        :return: dict
176        """
177        operation = {"type": op_type}
178        operation.update(parameters)
179
180        return operation

Build an operation descriptor.

Parameters
  • op_type: an operation type as a string
  • parameters: a dict including the operation's params
Returns

dict

def operations(self, raw: Any) -> list[dict[str, typing.Any]]:
182    def operations(self, raw: Any) -> list[dict[str, Any]]:
183        """Return the list of operations to execute after the job completes.
184
185        :returns: by default return an empty array
186        """
187        return []

Return the list of operations to execute after the job completes.

:returns: by default return an empty array

def get_param( self, name: str, default: typing.Any | None = None, message: str | None = None) -> Any:
189    def get_param(self, name: str, default: Any | None = None, message: str | None = None) -> Any:
190        """Dotted access into the input JSON; errors when `message` is provided."""
191        return self.__get_param(self._input, name, default, message)

Dotted access into the input JSON; errors when message is provided.

def get_env( self, key: str, default: typing.Any | None = None, message: str | None = None) -> Any:
193    def get_env(self, key: str, default: Any | None = None, message: str | None = None) -> Any:
194        """
195        Wrap access to configuration values from the environment.
196
197        :param key: Key of the environment variable to get.
198        :param default: Default value, if not found. Default: None
199        :param message: Error message. If given and key not found, exit with error.
200                        Default: None
201        """
202        if key in os.environ:
203            return os.environ[key]
204        if message is not None:
205            self.error(message)
206        return default

Wrap access to configuration values from the environment.

Parameters
  • key: Key of the environment variable to get.
  • default: Default value, if not found. Default: None
  • message: Error message. If given and key not found, exit with error. Default: None
def error(self, message: str, ensure_ascii: bool = False) -> NoReturn:
208    def error(self, message: str, ensure_ascii: bool = False) -> NoReturn:
209        """
210        Stop analyzer with an error message.
211
212        Changing ensure_ascii can be helpful when stucking with ascii <-> utf-8 issues.
213        Additionally, the input as returned, too.
214        Maybe helpful when dealing with errors.
215
216        :param message: Error message
217        :param ensure_ascii: Force ascii output. Default: False
218        """
219        # Get analyzer input
220        analyzer_input = self._input
221
222        # Loop over all the sensitive config names and clean them
223        analyzer_input["config"] = sanitize_config(
224            analyzer_input.get("config", {}), self.secret_phrases
225        )
226
227        self.__write_output(
228            {"success": False, "input": analyzer_input, "errorMessage": message},
229            ensure_ascii=ensure_ascii,
230        )
231
232        # Force exit after error
233        sys.exit(1)

Stop analyzer with an error message.

Changing ensure_ascii can be helpful when stucking with ascii <-> utf-8 issues. Additionally, the input as returned, too. Maybe helpful when dealing with errors.

Parameters
  • message: Error message
  • ensure_ascii: Force ascii output. Default: False
def summary(self, raw: Any) -> dict[str, typing.Any]:
235    def summary(self, raw: Any) -> dict[str, Any]:
236        """Return a summary for 'short.html' template.
237
238        Overwrite it for your needs!
239
240        :returns: by default return an empty dict
241        """
242        return {}

Return a summary for 'short.html' template.

Overwrite it for your needs!

:returns: by default return an empty dict

def artifacts(self, raw: Any) -> list[dict[str, typing.Any]]:
244    def artifacts(self, raw: Any) -> list[dict[str, Any]]:
245        """Return a list of artifacts (empty by default)."""
246        return []

Return a list of artifacts (empty by default).

def report(self, output: dict[str, typing.Any], ensure_ascii: bool = False) -> None:
248    def report(self, output: dict[str, Any], ensure_ascii: bool = False) -> None:
249        """Return a JSON dict via stdout.
250
251        :param output: worker output.
252        :param ensure_ascii: Force ascii output. Default: False
253        """
254        self.__write_output(output, ensure_ascii=ensure_ascii)

Return a JSON dict via stdout.

Parameters
  • output: worker output.
  • ensure_ascii: Force ascii output. Default: False
@abstractmethod
def run(self) -> None:
256    @abstractmethod
257    def run(self) -> None:
258        """Entry point to implement in subclasses."""

Entry point to implement in subclasses.

def runner(worker_cls: type[~T]) -> None:
29def runner(worker_cls: type[T]) -> None:
30    """Instantiate and run a worker class with a ``run()`` method."""
31    worker: Runnable = worker_cls()  # type: ignore[call-arg]
32    worker.run()

Instantiate and run a worker class with a run() method.