sentineliqsdk
Public package entry points for SentinelIQ SDK.
Exports the modern API and a convenience runner for workers.
1"""Public package entry points for SentinelIQ SDK. 2 3Exports the modern API and a convenience runner for workers. 4""" 5 6from __future__ import annotations 7 8from typing import Protocol, TypeVar 9 10from sentineliqsdk.analyzers import Analyzer 11from sentineliqsdk.core import Worker 12from sentineliqsdk.extractors import Extractor 13from sentineliqsdk.responders import Responder 14 15__all__ = ["Analyzer", "Extractor", "Responder", "Worker", "runner"] 16 17 18class Runnable(Protocol): 19 """Protocol for runnable workers exposing a ``run()`` method.""" 20 21 def run(self) -> None: # pragma: no cover - typing-only contract 22 ... 23 24 25T = TypeVar("T", bound=Runnable) 26 27 28def runner(worker_cls: type[T]) -> None: 29 """Instantiate and run a worker class with a ``run()`` method.""" 30 worker: Runnable = worker_cls() # type: ignore[call-arg] 31 worker.run()
18class Analyzer(Worker): 19 """Base class for analyzers with auto-extraction and helpers.""" 20 21 def __init__(self, job_directory: str | None = None, secret_phrases=None) -> None: 22 super().__init__(job_directory, secret_phrases) 23 self.auto_extract: bool = self.get_param("config.auto_extract", True) 24 25 def get_data(self) -> Any: 26 """Return the observable value or filename for `file` datatypes.""" 27 if self.data_type == "file": 28 return self.get_param("filename", None, "Missing filename.") 29 return self.get_param("data", None, "Missing data field") 30 31 def get_param(self, name: str, default: Any | None = None, message: str | None = None) -> Any: 32 """Resolve dotted name; special-case `file`/`filename` for job-dir absolute path. 33 34 - When `dataType == "file"` and running in job-directory mode, `get_param("file")` 35 maps to the underlying `filename` value and, if the file exists under 36 `<job_dir>/input/`, returns its absolute path. If not found, returns the raw value. 37 - `get_param("filename")` behaves the same in this context. 38 """ 39 # Determine the base key to fetch if the accessor is the logical "file" alias. 40 base_key = "filename" if (name == "file") else name 41 data = super().get_param(base_key, default, message) 42 if ( 43 base_key in {"file", "filename"} 44 and self.data_type == "file" 45 and self.job_directory is not None 46 and isinstance(data, str) 47 ): 48 path = f"{self.job_directory}/input/{data}" 49 if os.path.isfile(path): 50 return path 51 return data 52 53 def build_taxonomy( 54 self, level: TaxonomyLevel, namespace: str, predicate: str, value: str 55 ) -> dict: 56 """Create a normalized taxonomy entry for report metadata.""" 57 if level not in ("info", "safe", "suspicious", "malicious"): 58 level = "info" 59 return { 60 "level": level, 61 "namespace": namespace, 62 "predicate": predicate, 63 "value": value, 64 } 65 66 def summary(self, raw: Any) -> dict: 67 """Return analyzer-specific short summary (optional).""" 68 return {} 69 70 def artifacts(self, raw: Any) -> list[dict]: 71 """Auto-extract IOCs from the full report when enabled.""" 72 if self.auto_extract: 73 extractor = Extractor(ignore=self.get_data()) 74 return extractor.check_iterable(raw) 75 return [] 76 77 def _copy_file_to_output(self, src_path: str) -> dict | None: 78 """Copy a source file into `<job_dir>/output/` and return file metadata. 79 80 When running in STDIN mode (`job_directory is None`), this returns None to 81 avoid creating an `output/` directory in the current working directory. 82 """ 83 if self.job_directory is None: 84 return None 85 if not os.path.isfile(src_path): 86 return None 87 output_dir = os.path.join(self.job_directory or "", "output") 88 os.makedirs(output_dir, exist_ok=True) 89 with tempfile.NamedTemporaryFile(dir=output_dir, delete=False) as dst: 90 with open(src_path, "rb") as src: 91 copyfileobj(src, dst) 92 dstfname = dst.name 93 os.chmod(dstfname, 0o444) 94 return {"file": os.path.basename(dstfname), "filename": os.path.basename(src_path)} 95 96 def build_artifact(self, data_type: str, data: Any, **kwargs: Any) -> dict | None: 97 """Build an artifact dict; copy files to output for `file` type. 98 99 In STDIN mode (`job_directory is None`), `file` artifacts are skipped 100 and this returns None. 101 """ 102 if data_type == "file": 103 file_fields = self._copy_file_to_output(str(data)) 104 if file_fields is None: 105 return None 106 out = {"dataType": data_type, **file_fields, **kwargs} 107 return out 108 return {"dataType": data_type, "data": data, **kwargs} 109 110 def report(self, full_report: dict, ensure_ascii: bool = False) -> None: 111 """Wrap full report with SDK envelope and write JSON output.""" 112 summary: dict = {} 113 with suppress(Exception): 114 summary = self.summary(full_report) 115 operation_list: list[dict] = [] 116 with suppress(Exception): 117 operation_list = self.operations(full_report) 118 super().report( 119 { 120 "success": True, 121 "summary": summary, 122 "artifacts": self.artifacts(full_report), 123 "operations": operation_list, 124 "full": full_report, 125 }, 126 ensure_ascii, 127 ) 128 129 def run(self) -> None: # pragma: no cover - to be overridden 130 """Override in subclasses."""
Base class for analyzers with auto-extraction and helpers.
25 def get_data(self) -> Any: 26 """Return the observable value or filename for `file` datatypes.""" 27 if self.data_type == "file": 28 return self.get_param("filename", None, "Missing filename.") 29 return self.get_param("data", None, "Missing data field")
Return the observable value or filename for file datatypes.
31 def get_param(self, name: str, default: Any | None = None, message: str | None = None) -> Any: 32 """Resolve dotted name; special-case `file`/`filename` for job-dir absolute path. 33 34 - When `dataType == "file"` and running in job-directory mode, `get_param("file")` 35 maps to the underlying `filename` value and, if the file exists under 36 `<job_dir>/input/`, returns its absolute path. If not found, returns the raw value. 37 - `get_param("filename")` behaves the same in this context. 38 """ 39 # Determine the base key to fetch if the accessor is the logical "file" alias. 40 base_key = "filename" if (name == "file") else name 41 data = super().get_param(base_key, default, message) 42 if ( 43 base_key in {"file", "filename"} 44 and self.data_type == "file" 45 and self.job_directory is not None 46 and isinstance(data, str) 47 ): 48 path = f"{self.job_directory}/input/{data}" 49 if os.path.isfile(path): 50 return path 51 return data
Resolve dotted name; special-case file/filename for job-dir absolute path.
- When
dataType == "file"and running in job-directory mode,get_param("file")maps to the underlyingfilenamevalue and, if the file exists under<job_dir>/input/, returns its absolute path. If not found, returns the raw value. get_param("filename")behaves the same in this context.
53 def build_taxonomy( 54 self, level: TaxonomyLevel, namespace: str, predicate: str, value: str 55 ) -> dict: 56 """Create a normalized taxonomy entry for report metadata.""" 57 if level not in ("info", "safe", "suspicious", "malicious"): 58 level = "info" 59 return { 60 "level": level, 61 "namespace": namespace, 62 "predicate": predicate, 63 "value": value, 64 }
Create a normalized taxonomy entry for report metadata.
66 def summary(self, raw: Any) -> dict: 67 """Return analyzer-specific short summary (optional).""" 68 return {}
Return analyzer-specific short summary (optional).
70 def artifacts(self, raw: Any) -> list[dict]: 71 """Auto-extract IOCs from the full report when enabled.""" 72 if self.auto_extract: 73 extractor = Extractor(ignore=self.get_data()) 74 return extractor.check_iterable(raw) 75 return []
Auto-extract IOCs from the full report when enabled.
96 def build_artifact(self, data_type: str, data: Any, **kwargs: Any) -> dict | None: 97 """Build an artifact dict; copy files to output for `file` type. 98 99 In STDIN mode (`job_directory is None`), `file` artifacts are skipped 100 and this returns None. 101 """ 102 if data_type == "file": 103 file_fields = self._copy_file_to_output(str(data)) 104 if file_fields is None: 105 return None 106 out = {"dataType": data_type, **file_fields, **kwargs} 107 return out 108 return {"dataType": data_type, "data": data, **kwargs}
Build an artifact dict; copy files to output for file type.
In STDIN mode (job_directory is None), file artifacts are skipped
and this returns None.
110 def report(self, full_report: dict, ensure_ascii: bool = False) -> None: 111 """Wrap full report with SDK envelope and write JSON output.""" 112 summary: dict = {} 113 with suppress(Exception): 114 summary = self.summary(full_report) 115 operation_list: list[dict] = [] 116 with suppress(Exception): 117 operation_list = self.operations(full_report) 118 super().report( 119 { 120 "success": True, 121 "summary": summary, 122 "artifacts": self.artifacts(full_report), 123 "operations": operation_list, 124 "full": full_report, 125 }, 126 ensure_ascii, 127 )
Wrap full report with SDK envelope and write JSON output.
27class Extractor: 28 """Detect IOC attribute types using stdlib-backed heuristics. 29 30 Two functions are provided: 31 - ``check_string(str)`` which checks a string and returns the type. 32 - ``check_iterable(itr)`` that iterates over a list or a dictionary and returns a 33 list of {type, value} dicts. 34 35 Note: not a full-text search; IOC values must appear as isolated strings. 36 37 :param ignore: String to ignore when matching artifacts to type 38 """ 39 40 def __init__(self, ignore: str | None = None): 41 self.ignore = ignore 42 # Small per-instance cache to avoid repeated classification work. 43 self._cache: dict[tuple[str | None, str], str] = {} 44 45 # --- Type checks --- 46 @staticmethod 47 def _is_ip(value: str) -> bool: 48 try: 49 ipaddress.ip_address(value) 50 return True 51 except ValueError: 52 return False 53 54 @staticmethod 55 def _is_url(value: str) -> bool: 56 if not value.startswith(("http://", "https://")): 57 return False 58 parsed = urlparse(value) 59 return bool(parsed.scheme in {"http", "https"} and parsed.netloc) 60 61 @staticmethod 62 def _label_allowed(label: str) -> bool: 63 allowed = set(string.ascii_letters + string.digits + "_-") 64 return bool(label) and all(c in allowed for c in label) 65 66 @classmethod 67 def _is_domain(cls, value: str) -> bool: 68 if value.startswith(("http://", "https://")): 69 return False 70 parts = value.split(".") 71 if len(parts) != DOMAIN_PARTS: 72 return False 73 left, tld = parts 74 return cls._label_allowed(left) and tld.isalpha() 75 76 @staticmethod 77 def _is_hash(value: str) -> bool: 78 if len(value) not in {32, 40, 64}: 79 return False 80 hexd = set(string.hexdigits) 81 return all(c in hexd for c in value) 82 83 @staticmethod 84 def _is_user_agent(value: str) -> bool: 85 return value.startswith(("Mozilla/4.0 ", "Mozilla/5.0 ")) 86 87 @staticmethod 88 def _is_uri_path(value: str) -> bool: 89 if value.startswith(("http://", "https://")): 90 return False 91 parsed = urlparse(value) 92 return bool(parsed.scheme and "://" in value) 93 94 @staticmethod 95 def _is_registry(value: str) -> bool: 96 prefixes = ("HKEY", "HKLM", "HKCU", "HKCR", "HKCC") 97 if not value.startswith(prefixes): 98 return False 99 return "\\" in value 100 101 @staticmethod 102 def _is_mail(value: str) -> bool: 103 name, addr = parseaddr(value) 104 if addr != value: 105 return False 106 if "@" not in addr: 107 return False 108 local, _, domain = addr.partition("@") 109 return bool(local and domain and "." in domain) 110 111 @classmethod 112 def _is_fqdn(cls, value: str) -> bool: 113 if value.startswith(("http://", "https://")): 114 return False 115 parts = value.split(".") 116 if len(parts) < MIN_FQDN_LABELS: 117 return False 118 *labels, tld = parts 119 return all(cls._label_allowed(lbl) for lbl in labels) and tld.isalpha() 120 121 def __checktype(self, value: Any) -> str: 122 """Check if the given value is a known datatype. 123 124 :param value: The value to check 125 :type value: str or number 126 :return: Data type of value, if known, else empty string 127 :rtype: str 128 """ 129 if self.ignore: 130 # Ignore only exact matches to avoid hiding valid IOCs that merely 131 # contain the observable as a substring. 132 if isinstance(value, str) and self.ignore == value: 133 return "" 134 135 if isinstance(value, str): 136 key = (self.ignore, value) 137 if key in self._cache: 138 return self._cache[key] 139 140 dtype = "" 141 checks: list[tuple[Callable[[str], bool], str]] = [ 142 (self._is_ip, "ip"), 143 (self._is_url, "url"), 144 (self._is_domain, "domain"), 145 (self._is_hash, "hash"), 146 (self._is_user_agent, "user-agent"), 147 (self._is_uri_path, "uri_path"), 148 (self._is_registry, "registry"), 149 (self._is_mail, "mail"), 150 (self._is_fqdn, "fqdn"), 151 ] 152 for predicate, dtype_name in checks: 153 if predicate(value): 154 dtype = dtype_name 155 break 156 157 self._cache[key] = dtype 158 return dtype 159 return "" 160 161 def check_string(self, value: str) -> str: 162 """Check if a string matches a datatype. 163 164 :param value: String to test 165 :type value: str 166 :return: Data type or empty string 167 :rtype: str 168 """ 169 return self.__checktype(value) 170 171 def check_iterable(self, iterable: Any) -> list[dict[str, str]]: 172 """Check values of a list or a dict for IOCs. 173 174 Returns a list of dict {type, value}. Raises TypeError if iterable is not an 175 expected type. 176 177 :param iterable: List or dict of values 178 :type iterable: list | dict | str 179 :return: List of IOCs matching the regex 180 :rtype: list 181 """ 182 results: list[dict[str, str]] = [] 183 184 if not isinstance(iterable, str | list | dict): 185 raise TypeError("Not supported type.") 186 187 stack: list[Any] = [iterable] 188 while stack: 189 item = stack.pop() 190 if isinstance(item, dict): 191 stack.extend(item.values()) 192 elif isinstance(item, list): 193 stack.extend(item) 194 elif isinstance(item, str): 195 dt = self.__checktype(item) 196 if dt: 197 results.append({"dataType": dt, "data": item}) 198 199 return self.deduplicate(results) 200 201 @staticmethod 202 def deduplicate(list_of_objects: list[dict[str, str]]) -> list[dict[str, str]]: 203 """Deduplicate list of IOC objects by type + data in O(n).""" 204 seen: set[tuple[str, str]] = set() 205 dedup_list: list[dict[str, str]] = [] 206 for obj in list_of_objects: 207 key = (obj["dataType"], obj["data"]) 208 if key in seen: 209 continue 210 seen.add(key) 211 dedup_list.append(obj) 212 return dedup_list
Detect IOC attribute types using stdlib-backed heuristics.
Two functions are provided:
check_string(str)which checks a string and returns the type.check_iterable(itr)that iterates over a list or a dictionary and returns a list of {type, value} dicts.
Note: not a full-text search; IOC values must appear as isolated strings.
Parameters
- ignore: String to ignore when matching artifacts to type
161 def check_string(self, value: str) -> str: 162 """Check if a string matches a datatype. 163 164 :param value: String to test 165 :type value: str 166 :return: Data type or empty string 167 :rtype: str 168 """ 169 return self.__checktype(value)
Check if a string matches a datatype.
Parameters
- value: String to test
Returns
Data type or empty string
171 def check_iterable(self, iterable: Any) -> list[dict[str, str]]: 172 """Check values of a list or a dict for IOCs. 173 174 Returns a list of dict {type, value}. Raises TypeError if iterable is not an 175 expected type. 176 177 :param iterable: List or dict of values 178 :type iterable: list | dict | str 179 :return: List of IOCs matching the regex 180 :rtype: list 181 """ 182 results: list[dict[str, str]] = [] 183 184 if not isinstance(iterable, str | list | dict): 185 raise TypeError("Not supported type.") 186 187 stack: list[Any] = [iterable] 188 while stack: 189 item = stack.pop() 190 if isinstance(item, dict): 191 stack.extend(item.values()) 192 elif isinstance(item, list): 193 stack.extend(item) 194 elif isinstance(item, str): 195 dt = self.__checktype(item) 196 if dt: 197 results.append({"dataType": dt, "data": item}) 198 199 return self.deduplicate(results)
Check values of a list or a dict for IOCs.
Returns a list of dict {type, value}. Raises TypeError if iterable is not an expected type.
Parameters
- iterable: List or dict of values
Returns
List of IOCs matching the regex
201 @staticmethod 202 def deduplicate(list_of_objects: list[dict[str, str]]) -> list[dict[str, str]]: 203 """Deduplicate list of IOC objects by type + data in O(n).""" 204 seen: set[tuple[str, str]] = set() 205 dedup_list: list[dict[str, str]] = [] 206 for obj in list_of_objects: 207 key = (obj["dataType"], obj["data"]) 208 if key in seen: 209 continue 210 seen.add(key) 211 dedup_list.append(obj) 212 return dedup_list
Deduplicate list of IOC objects by type + data in O(n).
11class Responder(Worker): 12 """Base class for responders.""" 13 14 def __init__(self, job_directory: str | None = None, secret_phrases=None): 15 super().__init__(job_directory, secret_phrases) 16 17 def get_data(self): 18 """Return data from input dict. 19 20 :return: Data (observable value) given through Cortex 21 """ 22 return self.get_param("data", None, "Missing data field") 23 24 def report(self, full_report, ensure_ascii: bool = False): 25 """Return a JSON dict via stdout. 26 27 :param full_report: Responsder results as dict. 28 :param ensure_ascii: Force ascii output. Default: False 29 """ 30 operation_list = [] 31 with suppress(Exception): 32 operation_list = self.operations(full_report) 33 super().report( 34 {"success": True, "full": full_report, "operations": operation_list}, 35 ensure_ascii, 36 ) 37 38 def run(self): 39 """Overwritten by responders."""
Base class for responders.
17 def get_data(self): 18 """Return data from input dict. 19 20 :return: Data (observable value) given through Cortex 21 """ 22 return self.get_param("data", None, "Missing data field")
Return data from input dict.
Returns
Data (observable value) given through Cortex
24 def report(self, full_report, ensure_ascii: bool = False): 25 """Return a JSON dict via stdout. 26 27 :param full_report: Responsder results as dict. 28 :param ensure_ascii: Force ascii output. Default: False 29 """ 30 operation_list = [] 31 with suppress(Exception): 32 operation_list = self.operations(full_report) 33 super().report( 34 {"success": True, "full": full_report, "operations": operation_list}, 35 ensure_ascii, 36 )
Return a JSON dict via stdout.
Parameters
- full_report: Responsder results as dict.
- ensure_ascii: Force ascii output. Default: False
31class Worker(ABC): 32 """Common functionality for analyzers and responders.""" 33 34 READ_TIMEOUT = 3 # seconds 35 36 def __init__( 37 self, 38 job_directory: str | None = None, 39 secret_phrases: tuple[str, ...] | None = None, 40 ) -> None: 41 # Compute initial job directory path or default; may switch to None for STDIN mode later 42 initial_job_dir = ( 43 job_directory 44 if job_directory is not None 45 else (sys.argv[1] if len(sys.argv) > 1 else "/job") 46 ) 47 self.job_directory: str | None = initial_job_dir 48 self.secret_phrases = DEFAULT_SECRET_PHRASES if secret_phrases is None else secret_phrases 49 # Load input 50 self._input: dict[str, Any] = {} 51 input_path = f"{self.job_directory}/input/input.json" 52 if os.path.isfile(input_path): 53 with open(input_path) as f_input: 54 self._input = json.load(f_input) 55 else: 56 # If input file doesn't exist, read input from STDIN (with timeout) 57 self.job_directory = None 58 self.__set_encoding() 59 is_tty = True 60 with suppress(Exception): 61 is_tty = bool(getattr(sys.stdin, "isatty", lambda: True)()) 62 if not is_tty: 63 # Try a non-blocking readiness check where supported; fall back gracefully 64 try: 65 fileno = sys.stdin.fileno() # type: ignore[attr-defined] 66 except Exception: 67 # e.g., StringIO without fileno(): try reading directly 68 try: 69 self._input = json.load(sys.stdin) 70 except Exception: 71 self.error(f"No input: missing '{input_path}' and STDIN.") 72 else: 73 try: 74 rlist, _, _ = select.select([sys.stdin], [], [], self.READ_TIMEOUT) 75 if rlist: 76 self._input = json.load(sys.stdin) 77 else: 78 self.error(f"No input: missing '{input_path}' and STDIN.") 79 except Exception: 80 # If select is unsupported, attempt direct read 81 try: 82 self._input = json.load(sys.stdin) 83 except Exception: 84 self.error(f"No input: missing '{input_path}' and STDIN.") 85 else: 86 self.error(f"No input: missing '{input_path}' and STDIN.") 87 88 # Set parameters 89 self.data_type = self.get_param("dataType", None, "Missing dataType field") 90 self.tlp = self.get_param("tlp", 2) 91 self.pap = self.get_param("pap", 2) 92 93 self.enable_check_tlp = self.get_param("config.check_tlp", False) 94 self.max_tlp = self.get_param("config.max_tlp", 2) 95 96 self.enable_check_pap = self.get_param("config.check_pap", False) 97 self.max_pap = self.get_param("config.max_pap", 2) 98 99 # Set proxy configuration if available 100 self.http_proxy = self.get_param("config.proxy.http") 101 self.https_proxy = self.get_param("config.proxy.https") 102 103 self.__set_proxies() 104 105 # Finally run check tlp/pap 106 if not (self.__check_tlp()): 107 self.error("TLP is higher than allowed.") 108 109 if not (self.__check_pap()): 110 self.error("PAP is higher than allowed.") 111 112 def __set_proxies(self) -> None: 113 EnvProxyConfigurator().set_environ(self.http_proxy, self.https_proxy) 114 115 @staticmethod 116 def __set_encoding() -> None: 117 """Ensure stdout/stderr use UTF-8 writers when not already UTF-8.""" 118 with suppress(Exception): 119 ensure_utf8_streams() 120 121 def __get_param( 122 self, 123 source: Mapping[str, Any], 124 name: str | list[str], 125 default: Any | None = None, 126 message: str | None = None, 127 ) -> Any: 128 """Extract a specific parameter from given source. 129 130 :param source: Python dict to search through 131 :param name: Name of the parameter to get. JSON-like syntax, 132 e.g. `config.username` at first, but in recursive calls a list 133 :param default: Default value, if not found. Default: None 134 :param message: Error message. If given and name not found, exit with error. 135 Default: None 136 """ 137 if isinstance(name, str): 138 name = name.split(".") 139 140 if len(name) == 0: 141 # The name is empty, return the source content 142 return source 143 new_source = source.get(name[0]) 144 if new_source is not None: 145 return self.__get_param(new_source, name[1:], default, message) 146 if message is not None: 147 self.error(message) 148 return default 149 150 def __check_tlp(self) -> bool: 151 """Check if TLP is within allowed range; return False if too high.""" 152 return not (self.enable_check_tlp and self.tlp > self.max_tlp) 153 154 def __check_pap(self) -> bool: 155 """Check if PAP is within allowed range; return False if too high.""" 156 return not (self.enable_check_pap and self.pap > self.max_pap) 157 158 def __write_output(self, data: dict[str, Any], ensure_ascii: bool = False) -> None: 159 JsonOutputWriter().write(data, self.job_directory, ensure_ascii=ensure_ascii) 160 161 def get_data(self) -> Any: 162 """Return data from input dict. 163 164 :return: Data (observable value) given through Cortex 165 """ 166 return self.get_param("data", None, "Missing data field") 167 168 @staticmethod 169 def build_operation(op_type: str, **parameters: Any) -> dict[str, Any]: 170 """ 171 Build an operation descriptor. 172 173 :param op_type: an operation type as a string 174 :param parameters: a dict including the operation's params 175 :return: dict 176 """ 177 operation = {"type": op_type} 178 operation.update(parameters) 179 180 return operation 181 182 def operations(self, raw: Any) -> list[dict[str, Any]]: 183 """Return the list of operations to execute after the job completes. 184 185 :returns: by default return an empty array 186 """ 187 return [] 188 189 def get_param(self, name: str, default: Any | None = None, message: str | None = None) -> Any: 190 """Dotted access into the input JSON; errors when `message` is provided.""" 191 return self.__get_param(self._input, name, default, message) 192 193 def get_env(self, key: str, default: Any | None = None, message: str | None = None) -> Any: 194 """ 195 Wrap access to configuration values from the environment. 196 197 :param key: Key of the environment variable to get. 198 :param default: Default value, if not found. Default: None 199 :param message: Error message. If given and key not found, exit with error. 200 Default: None 201 """ 202 if key in os.environ: 203 return os.environ[key] 204 if message is not None: 205 self.error(message) 206 return default 207 208 def error(self, message: str, ensure_ascii: bool = False) -> NoReturn: 209 """ 210 Stop analyzer with an error message. 211 212 Changing ensure_ascii can be helpful when stucking with ascii <-> utf-8 issues. 213 Additionally, the input as returned, too. 214 Maybe helpful when dealing with errors. 215 216 :param message: Error message 217 :param ensure_ascii: Force ascii output. Default: False 218 """ 219 # Get analyzer input 220 analyzer_input = self._input 221 222 # Loop over all the sensitive config names and clean them 223 analyzer_input["config"] = sanitize_config( 224 analyzer_input.get("config", {}), self.secret_phrases 225 ) 226 227 self.__write_output( 228 {"success": False, "input": analyzer_input, "errorMessage": message}, 229 ensure_ascii=ensure_ascii, 230 ) 231 232 # Force exit after error 233 sys.exit(1) 234 235 def summary(self, raw: Any) -> dict[str, Any]: 236 """Return a summary for 'short.html' template. 237 238 Overwrite it for your needs! 239 240 :returns: by default return an empty dict 241 """ 242 return {} 243 244 def artifacts(self, raw: Any) -> list[dict[str, Any]]: 245 """Return a list of artifacts (empty by default).""" 246 return [] 247 248 def report(self, output: dict[str, Any], ensure_ascii: bool = False) -> None: 249 """Return a JSON dict via stdout. 250 251 :param output: worker output. 252 :param ensure_ascii: Force ascii output. Default: False 253 """ 254 self.__write_output(output, ensure_ascii=ensure_ascii) 255 256 @abstractmethod 257 def run(self) -> None: 258 """Entry point to implement in subclasses."""
Common functionality for analyzers and responders.
161 def get_data(self) -> Any: 162 """Return data from input dict. 163 164 :return: Data (observable value) given through Cortex 165 """ 166 return self.get_param("data", None, "Missing data field")
Return data from input dict.
Returns
Data (observable value) given through Cortex
168 @staticmethod 169 def build_operation(op_type: str, **parameters: Any) -> dict[str, Any]: 170 """ 171 Build an operation descriptor. 172 173 :param op_type: an operation type as a string 174 :param parameters: a dict including the operation's params 175 :return: dict 176 """ 177 operation = {"type": op_type} 178 operation.update(parameters) 179 180 return operation
Build an operation descriptor.
Parameters
- op_type: an operation type as a string
- parameters: a dict including the operation's params
Returns
dict
182 def operations(self, raw: Any) -> list[dict[str, Any]]: 183 """Return the list of operations to execute after the job completes. 184 185 :returns: by default return an empty array 186 """ 187 return []
Return the list of operations to execute after the job completes.
:returns: by default return an empty array
189 def get_param(self, name: str, default: Any | None = None, message: str | None = None) -> Any: 190 """Dotted access into the input JSON; errors when `message` is provided.""" 191 return self.__get_param(self._input, name, default, message)
Dotted access into the input JSON; errors when message is provided.
193 def get_env(self, key: str, default: Any | None = None, message: str | None = None) -> Any: 194 """ 195 Wrap access to configuration values from the environment. 196 197 :param key: Key of the environment variable to get. 198 :param default: Default value, if not found. Default: None 199 :param message: Error message. If given and key not found, exit with error. 200 Default: None 201 """ 202 if key in os.environ: 203 return os.environ[key] 204 if message is not None: 205 self.error(message) 206 return default
Wrap access to configuration values from the environment.
Parameters
- key: Key of the environment variable to get.
- default: Default value, if not found. Default: None
- message: Error message. If given and key not found, exit with error. Default: None
208 def error(self, message: str, ensure_ascii: bool = False) -> NoReturn: 209 """ 210 Stop analyzer with an error message. 211 212 Changing ensure_ascii can be helpful when stucking with ascii <-> utf-8 issues. 213 Additionally, the input as returned, too. 214 Maybe helpful when dealing with errors. 215 216 :param message: Error message 217 :param ensure_ascii: Force ascii output. Default: False 218 """ 219 # Get analyzer input 220 analyzer_input = self._input 221 222 # Loop over all the sensitive config names and clean them 223 analyzer_input["config"] = sanitize_config( 224 analyzer_input.get("config", {}), self.secret_phrases 225 ) 226 227 self.__write_output( 228 {"success": False, "input": analyzer_input, "errorMessage": message}, 229 ensure_ascii=ensure_ascii, 230 ) 231 232 # Force exit after error 233 sys.exit(1)
Stop analyzer with an error message.
Changing ensure_ascii can be helpful when stucking with ascii <-> utf-8 issues. Additionally, the input as returned, too. Maybe helpful when dealing with errors.
Parameters
- message: Error message
- ensure_ascii: Force ascii output. Default: False
235 def summary(self, raw: Any) -> dict[str, Any]: 236 """Return a summary for 'short.html' template. 237 238 Overwrite it for your needs! 239 240 :returns: by default return an empty dict 241 """ 242 return {}
Return a summary for 'short.html' template.
Overwrite it for your needs!
:returns: by default return an empty dict
244 def artifacts(self, raw: Any) -> list[dict[str, Any]]: 245 """Return a list of artifacts (empty by default).""" 246 return []
Return a list of artifacts (empty by default).
248 def report(self, output: dict[str, Any], ensure_ascii: bool = False) -> None: 249 """Return a JSON dict via stdout. 250 251 :param output: worker output. 252 :param ensure_ascii: Force ascii output. Default: False 253 """ 254 self.__write_output(output, ensure_ascii=ensure_ascii)
Return a JSON dict via stdout.
Parameters
- output: worker output.
- ensure_ascii: Force ascii output. Default: False
29def runner(worker_cls: type[T]) -> None: 30 """Instantiate and run a worker class with a ``run()`` method.""" 31 worker: Runnable = worker_cls() # type: ignore[call-arg] 32 worker.run()
Instantiate and run a worker class with a run() method.