Metadata-Version: 2.4
Name: ciaops
Version: 1.0.1
Summary: Python library - modules for processing data from the TI, ASM and DRP system collected in one library. This library simplifies work with the products API and gives you the flexibility to customize the search and retrieval of data from the system.
Author-email: Group-IB <integration@group-ib.com>
License: MIT
Keywords: group-ib,threat intelligence,digital risk protection,attack surface management,cybersecurity,ti,drp,asm,api client
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: Topic :: Security
Classifier: Typing :: Typed
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: License :: OSI Approved :: MIT License
Classifier: Natural Language :: English
Classifier: Operating System :: OS Independent
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: requests>=2.31.0
Requires-Dist: urllib3>=2.0.2
Provides-Extra: misp
Requires-Dist: pyaml>=25.7.0; extra == "misp"
Dynamic: license-file

# ciaops

[![Python](https://img.shields.io/badge/python-v3.10+-blue?logo=python)](https://python.org/downloads/release/python-3100/)

**ciaops** - Python library to communicate with **Company Products** (TI, DRP, ASM) via **API**.

## **License**

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## **Content**

- [ciaops](#ciaops)
  - [**License**](#license)
  - [**Content**](#content)
  - [**Installation**](#installation)
  - [**Usage**](#usage)
    - [Initialization](#initialization)
    - [Collection constants](#collection-constants)
    - [Collections mapping](#collections-mapping)
    - [Portions generator](#portions-generator)
    - [TIPoller extra methods](#tipoller-extra-methods)
      - [Available collections](#available-collections)
      - [Find feed by ID](#find-feed-by-id)
      - [Download file](#download-file)
      - [Download PDF reports](#download-pdf-reports)
      - [IP scoring](#ip-scoring)
      - [MITRE ATT\&CK](#mitre-attck)
      - [Global search](#global-search)
    - [DRPPoller methods](#drppoller-methods)
    - [ASMPoller methods](#asmpoller-methods)
    - [Close session](#close-session)
  - [Parsing](#parsing)
    - [Parse portion method](#parse-portion-method)
    - [Get IoCs method](#get-iocs-method)
  - [Utilities](#utilities)
    - [ParserHelper](#parserhelper)
      - [find\_by\_template](#find_by_template)
      - [find\_element\_by\_key](#find_element_by_key)
      - [unpack\_iocs](#unpack_iocs)
    - [Validator](#validator)
  - [Portal Links](#portal-links)
    - [Simple collections (single ID in URL)](#simple-collections-single-id-in-url)
    - [Multi-part URL templates](#multi-part-url-templates)
    - [Custom prefix override](#custom-prefix-override)
    - [Embedding portal links in parsed output](#embedding-portal-links-in-parsed-output)
    - [Inspecting the full URL map](#inspecting-the-full-url-map)
  - [Adapter utilities](#adapter-utilities)
    - [ConfigParser](#configparser)
    - [FileHandler](#filehandler)
  - [Examples](#examples)
    - [Full version of program](#full-version-of-program)
  - [API logic](#api-logic)
    - [Sequence update logic](#sequence-update-logic)
      - [API response](#api-response)
      - [Iteration steps](#iteration-steps)
      - [Stop the iteration](#stop-the-iteration)
    - [Search logic](#search-logic)
      - [Global search](#global-search-1)
      - [Iteration steps](#iteration-steps-1)
      - [Stop the iteration](#stop-the-iteration-1)
  - [Records limits](#records-limits)
  - [Recommended TTL](#recommended-ttl)
  - [Troubleshooting](#troubleshooting)
    - [401 response code](#401-response-code)
    - [403 response code](#403-response-code)
    - [504 response code or timeout](#504-response-code-or-timeout)
  - [FAQ](#faq)

<br>

## **Installation**

Lib deps: **requests**, **urllib3**.

ciaops lib is available on PyPI:

```
pip install ciaops
```

Or use a Portal WHL archive. Replace `X.X.X` with current lib version:

```
pip install ./ciaops-X.X.X-py3-none-any.whl
```

<br>

## **Usage**

### Initialization

Initialize a **poller** with your credentials. TLS certificate verification is enabled by default.

Call `set_verify()` only when you need to override the default: pass `False` to disable verification (e.g. local testing behind a corporate proxy), or pass a path to a custom CA bundle.

```python
from ciaops import TIPoller, DRPPoller, ASMPoller

# Threat Intelligence
ti = TIPoller(username='example@example.corp', api_key='API_KEY', api_url='TI_API_URL')

# Digital Risk Protection
drp = DRPPoller(username='example@example.corp', api_key='API_KEY', api_url='DRP_API_URL')

# Attack Surface Management
asm = ASMPoller(username='example@example.corp', api_key='API_KEY', api_url='ASM_API_URL')

# Override only when needed:
ti.set_verify(False)                  # disable TLS check (local testing only)
ti.set_verify('/path/to/ca-bundle')   # custom CA bundle
```

Proxy setup (all pollers share the same interface):

```python
ti.set_proxies(
    proxy_protocol='https',
    proxy_ip='10.0.0.1',
    proxy_port='3128',
    proxy_username='user',       # optional
    proxy_password='secret',     # optional
)
```

Use pollers as context managers to ensure the session is always closed:

```python
with TIPoller(username='...', api_key='...', api_url='...') as ti:
    generator = ti.create_update_generator('apt/threat', sequpdate=16172928022293)
    for portion in generator:
        print(portion.parse_portion())
```

Tag your integration in the User-Agent header with `set_product()` — required by some on-premise deployments:

```python
ti.set_product(
    product_type='SIEM',
    product_name='MySIEM',
    product_version='2.1',
    integration_name='ciaops-siem',
    integration_version='1.0',
)
```

### Collection constants

Use typed constants instead of bare strings to avoid typos and get IDE autocomplete:

```python
from ciaops.collections_meta import TICollections, DRPCollections, ASMCollections

# TI examples
TICollections.APT_THREAT          # "apt/threat"
TICollections.COMPROMISED_ACCOUNT_GROUP  # "compromised/account_group"
TICollections.MALWARE_CNC         # "malware/cnc"

# DRP
DRPCollections.VIOLATION          # "violation"
DRPCollections.COMPROMISED_DARKWEB  # "compromised/darkweb"

# ASM
ASMCollections.ASSETS_UPDATED     # "assets/updated"
ASMCollections.ISSUES_UPDATED     # "issues/updated"
```

Look up the recommended TTL for any TI collection:

```python
TICollections.get_ttl(TICollections.MALWARE_CNC)          # 90  (days)
TICollections.get_ttl(TICollections.COMPROMISED_MESSENGER) # None  (no expiry)
```

Use constants anywhere a collection name string is expected:

```python
poller.set_keys(TICollections.APT_THREAT, keys)
generator = poller.create_update_generator(TICollections.MALWARE_CNC, sequpdate=...)
```

### Collections mapping

Method `set_keys()` sets **keys** to search in the selected **collection**. It should be python dict `mapping_keys = {key: value}` where \
**key** - result name \
**value** - dot-notation string with searchable keys

```python
mapping_keys = {"result_name": "searchable_key_1.searchable_key_2"}
```

Parser finds keys recursively in the API response, using dot-notation in **value**.
If you want to add your own data to the results start the **value** with star `*`.

```python
mapping_keys = {
	"network": "indicators.params.ip",
	"result_name": "*My_Value"
}
```

For `set_keys()` or `set_iocs_keys()` methods you can make a full template to get nested data in the way you want.

```python
mapping_keys = {
	'network': {
		'ips': 'indicators.params.ip'
	},
	'url': 'indicators.params.url',
	'type': '*network'
}
poller.set_keys(collection_name="apt/threat", keys=mapping_keys)
poller.set_iocs_keys(collection_name="apt/threat", keys={"ips": "indicators.params.ip"})
```

### Portions generator

Use the next methods `create_update_generator()`, `create_search_generator()` to create a generator, which return portions of limited feeds. \
**Update generator** - goes through the feeds in ascending order. Feeds iteration based on `seqUpdate` field. \
**Search generator** - goes through the feeds in descending order. Feeds iteration based on `resultId` field.

**Note:** Update generator iterates over all collections excluding `compromised/breached` and `compromised/reaper`.
[Sequence update logic](#sequence-update-logic) is not applied to these collections.

```python
generator = poller.create_update_generator(
    collection_name='compromised/account_group',
    date_from='2021-01-30',
    date_to='2021-02-03',
    query='8.8.8.8',
    sequpdate=20000000,
    limit=200
)
```

Each portion (iterable object) presented as `Parser` class object.
You can get **raw data** (in json format) or **parsed portion** (python dictionary format),
using its methods and attributes.

```python
for portion in generator:
    parsed_json = portion.parse_portion(as_json=False)
    iocs = portion.get_iocs(as_json=False)
    sequpdate = portion.sequpdate
    count = portion.count
    raw_json = portion.raw_json
    raw_dict = portion.raw_dict
    new_parsed_json = portion.bulk_parse_portion(keys_list=[{"ips": "indicators.params.ip"}, {"url": 'indicators.params.url'}], as_json=False)
```

Attribute `sequpdate` of the generator iterable object, gives you the last **sequence update number** (`seqUpdate`)
of the feed, which you can save locally.

```python
sequpdate = portion.sequpdate
```

Attribute `count` of the generator iterable object, shows you the number of feeds left. This amount still in the queue.
For Search generator `count` will return total number of feeds in the queue.

```python
count = portion.count
```

Methods `parse_portion()` and `get_iocs()` of generator iterable objects, use your
mapping keys (IoCs keys) to return parsed data.
You can override mapping keys using `keys` parameter in these functions.

```python
parsed_json = portion.parse_portion(as_json=False)
iocs = portion.get_iocs(as_json=False, keys=mapping_override_keys)
```

Also, you can use `bulk_parse_portion()` method to get multiple parsed dicts from every feed.

```python
new_parsed_json = portion.bulk_parse_portion(keys_list=[{"ips": "indicators.params.ip"}, {"url": 'indicators.params.url'}], as_json=False)
```

### TIPoller extra methods

#### Available collections

Call `get_available_collections()` to discover which collections your API key can access before iterating.

```python
collection_list = ti.get_available_collections()
seq_update_dict = ti.get_seq_update_dict(date=’2020-12-12’)
compromised_account_sequpdate = seq_update_dict.get(‘compromised/account_group’)

# Check which collections have active hunting rules applied
hunting_collections = ti.get_hunting_rules_collections()
```

#### Find feed by ID

Returns a `Parser` object for a single feed by its ID.

```python
feed = ti.search_feed_by_id(collection_name=’apt/threat’, feed_id=’abc123’)
parsed = feed.parse_portion()
```

#### Download file

Download a binary file embedded inside a threat report.

```python
binary = ti.search_file_in_threats(
    collection_name=’hi/threat’,
    feed_id=’feed_id’,
    file_id=’file_id_inside_feed’,
)
```

#### Download PDF reports

```python
# Download a PDF for an HI or APT threat
pdf_bytes = ti.download_threat_pdf(threat_id=’abc123’)

# Download an HI analytic report PDF (use file.name field from hi/analytic record)
pdf_bytes = ti.download_analytic_report_pdf(file_name=’/23ae4ab7.../file/450ffbd4...’)

with open(‘report.pdf’, ‘wb’) as f:
    f.write(pdf_bytes)
```

#### IP scoring

Score one or more IPs against the TI database.

```python
# Single IP
result = ti.scoring(‘8.8.8.8’)
# → {"items": {"8.8.8.8": {"score": 7.5, ...}}}

# Multiple IPs
result = ti.scoring([‘8.8.8.8’, ‘1.1.1.1’])
```

#### MITRE ATT&CK

Fetch the full MITRE ATT&CK technique vocabulary or a ready-to-use ID→name map.

```python
# Raw vocabulary (includes all AttackPattern details)
vocab = ti.get_mitre_techniques()

# Convenient ID → name dict
mitre_map = ti.get_mitre_attack_pattern_map()
# → {"T1059": "Command and Scripting Interpreter", "T1078": "Valid Accounts", ...}

technique_name = mitre_map.get("T1059")
```

#### Global search

Search across all TI collections by query string.

```python
results = ti.global_search(‘8.8.8.8’)
# → [{"apiPath": "suspicious_ip/scanner", "count": 14, ...}, ...]
```

### DRPPoller methods

```python
from ciaops import DRPPoller

drp = DRPPoller(username=’...’, api_key=’...’, api_url=’DRP_URL’)
```

**Update generator** — iterate violation feeds:

```python
generator = drp.create_update_generator(
    collection_name=’violation’,
    sequpdate=1700000000000000,
    subtypes=[6],           # 1=counterfeit 2=piracy 3=partner_policy 4=trademark 5=malware 6=phishing 7=fraud
    section=[1, 2],         # 1=Web 2=Mobile 3=Marketplace 4=Social 5=Advertising 6=Messengers
    brands=[‘brand_id’],
    approve_states=[‘under_review’],
)
for portion in generator:
    data = portion.parse_portion()
```

**Find feed by ID:**

```python
feed = drp.search_feed_by_id(feed_id=’violation_id’)
raw = feed.raw_dict
```

**Change violation status** (only when status=`detected` and approveState=`under_review`):

```python
drp.change_status(feed_id=’violation_id’, status=’approve’)  # or ‘reject’
```

**Brands and subscriptions:**

```python
brands = drp.get_brands()
# → [{"name": "Brand A", "id": "id1"}, ...]

subscriptions = drp.get_subscriptions()
# → ["scam", "phishing", ...]
```

**Typo-squatting scan** (iterates from the very beginning):

```python
generator = drp.create_update_generator(
    collection_name=’violation’,
    use_typo_squatting=True,
)
```

**seqUpdate by date:**

```python
seq_dict = drp.get_seq_update_dict(date=’2024-01-15’)
# → {"violation": 1705276800000000, ...}
```

### ASMPoller methods

```python
from ciaops import ASMPoller

asm = ASMPoller(username=’...’, api_key=’...’, api_url=’ASM_URL’)
```

**List companies:**

```python
companies = asm.get_companies()          # all companies
active = asm.get_companies(status=’active’)
# → [{"id": "uuid", "name": "Acme Corp"}, ...]
```

**Update generator** — uses POST requests with automatic rate limiting:

```python
generator = asm.create_update_generator(
    collection_name=’assets/updated’,   # or ‘leaks/updated’, ‘issues/updated’
    company_id=’company-uuid’,          # or list of UUIDs
    date_from=’2024-01-01’,
    date_to=’2024-06-01’,               # optional
    count=500,                          # max 5000
    status=[‘new’, ‘confirmed’],        # optional filter
    type=[‘domain’, ‘ip’],              # optional filter (assets only)
)
for portion in generator:
    data = portion.parse_portion()
```

**Dashboard scores:**

```python
scores = asm.get_dashboard_scores(company_id=’company-uuid’)

print(scores.current_score)          # 7.4
print(scores.score_trend)            # "improving" | "declining" | "stable"
print(scores.total_critical)         # 3
print(scores.severity_summary)       # {"critical": 3, "high": 12, ...}
print(scores.counters_summary)       # {"new_assets": 5, "new_issues": 2, ...}
print(scores.lowest_scoring_category)  # {"name": "Network Security", ...}

summary = scores.get_dashboard_summary()  # full dict for periodic updates
raw = asm.get_dashboard_scores(company_id=’uuid’, as_raw=True)  # plain dict
```

**Issue management:**

```python
evidence = asm.get_issue_evidence(issue_id=’issue-uuid’)

asm.add_issue_comment(
    company_id=’company-uuid’,
    issue_id=’issue-uuid’,
    body=’Investigating...’,
)

asm.change_issue_status(
    issues_id=[‘issue-uuid-1’, ‘issue-uuid-2’],
    status=’Under review’,   # Detected | Under review | Solved | Ignored | False positive
)
```

**Asset management:**

```python
asm.add_assets(
    company_id=’company-uuid’,
    confirmed_domain=[‘example.corp’],
    confirmed_ip=[‘1.2.3.4’],
)   

asm.remove_assets(
    company_id=’company-uuid’,
    excluded_domain=[‘old.group-ib.com’],
)

asm.change_asset_status(
    assets_ids=[‘asset-uuid’],
    status=’confirmed’,   # new | false | confirmed
)
```

### Close session

Always close the session in a `try…finally` block, or use the context manager:

```python
from ciaops import TIPoller
from ciaops.exception import InputException, ConnectionException

try:
    poller = TIPoller(username=’example@group-ib.com’, api_key=’API_KEY’, api_url=’API_URL’)
    # ... do work ...
except InputException as e:
    logger.error("Wrong input: %s", e)
except ConnectionException as e:
    logger.error("Connection error: %s", e)
finally:
    poller.close_session()
```

<br>

## Parsing

Common example of API response from Collection (received feeds):

```python
api_response = [
    {
        'iocs': {
            'network': [
                {
                    'ip': [1, 2],
                    'url': 'url.com'
                },
                {
                    'ip': [3],
                    'url': ''
                }
            ]
        }
    },
    {
        'iocs': {
            'network': [
                {
                    'ip': [4, 5],
                    'url': 'new_url.com'
                }
            ]
        }
    }
]
```

### Parse portion method

Your mapping dict for `parse_portion()` or `bulk_parse_portion()` methods:

```python
mapping_keys = {
    'network': {'ips': 'iocs.network.ip'},
    'url': 'iocs.network.url',
    'type': '*custom_network'
}
```

Result of `parse_portion()` output:

```python
parsing_result = [
    {
        'network': {'ips': [[1, 2], [3]]},
        'url': ['url.com', ''],
        'type': 'custom_network'
    },
    {
        'network': {'ips': [[4, 5]]},
        'url': ['new_url.com'],
        'type': 'custom_network'
    }
]
```

Result of `bulk_parse_portion()` output:

```python
parsing_result = [
    [
        {
            'network': {'ips': [[1, 2], [3]]},
            'url': ['url.com', ''],
            'type': 'custom_network'}
    ],
    [
        {
            'network': {'ips': [[4, 5]]},
            'url': ['new_url.com'],
            'type': 'custom_network'}
    ]
]
```

### Get IoCs method

Your mapping dict for `get_iocs()` method:

```python
mapping_keys = {
    'ips': 'iocs.network.ip',
    'url': 'iocs.network.url'
}
```

Result of `get_iocs()` output:

```python
parsing_result = {
    'ips': [1, 2, 3, 4, 5],
    'url': ['url.com', 'new_url.com']
}
```

<br>

## Utilities

`ParserHelper` and `Validator` are standalone utilities available for use outside the generator flow — for example, when post-processing raw API responses or building custom pipelines on top of the library.

```python
from ciaops.utils import ParserHelper, Validator
```

### ParserHelper

#### find_by_template

Parse a single feed `dict` against a key-mapping template. Returns a `dict` with the resolved values.

```python
feed = {
    "id": "abc123",
    "evaluation": {"severity": "high"},
    "indicators": [{"params": {"ip": "1.2.3.4"}}, {"params": {"ip": "5.6.7.8"}}]
}

keys = {
    "feed_id":  "id",
    "severity": "evaluation.severity",
    "ips":      "indicators.params.ip",
    "source":   "*Group-IB",
}

result = ParserHelper.find_by_template(feed, keys)
# {
#     "feed_id":  "abc123",
#     "severity": "high",
#     "ips":      ["1.2.3.4", "5.6.7.8"],
#     "source":   "Group-IB",
# }
```

Supported value directives:

| Directive                                                                    | Example                           | Result                                                                     |
| ---------------------------------------------------------------------------- | --------------------------------- | -------------------------------------------------------------------------- |
| Dot-path string                                                              | `"evaluation.severity"`           | Value at that path                                                         |
| `"*literal"` (star prefix)                                                   | `"*Group-IB"`                     | The literal string `"Group-IB"`                                            |
| `"#field[N]"` (hash prefix)                                                  | `"#items[0]"`                     | Element at index N of the list found at `field`                            |
| Nested dict                                                                  | `{"ips": "indicators.params.ip"}` | Recursive template application                                             |
| `{"__nested_dot_path_to_list": "path", ...}`                                 | —                                 | Maps the inner template over each item in the list at `path`               |
| `{"__concatenate": {"static": "https://portal/?id=", "dynamic": "id"}}`      | —                                 | Concatenates a static prefix with a dynamic field value                    |
| `{"__concatenate": {"collection": "apt/threat", "dynamic": "id"}}`           | —                                 | Prefix auto-resolved from portal links for the given collection            |
| `{"__concatenate": {"parts": ["*https://portal/", "category", "*-", "id"]}}` | —                                 | Multi-part concatenation: `*` marks literals, bare strings are field paths |

Optional kwargs:

- `use_join_to_end_list=True` — joins list values into a single comma-separated string.
- `except_keys=["field"]` — excludes specific keys from the joining above.

#### find_element_by_key

Traverse any `dict` or `list` using a dot-notation path. Safe for nested lists and missing keys.

```python
from ciaops.utils import find_element_by_key

find_element_by_key({"a": {"b": 1}}, "a.b")
# → 1

find_element_by_key({"items": [{"ip": "1.2.3.4"}, {"ip": "5.6.7.8"}]}, "items.ip")
# → ["1.2.3.4", "5.6.7.8"]

find_element_by_key({"a": None}, "a.b")
# → None
```

#### unpack_iocs

Recursively flattens a nested list of IoC values into a single deduplicated list. Filters out noise values (`""`, `None`, `"0.0.0.0"`, `"255.255.255.255"`).

```python
raw = [["1.2.3.4", "5.6.7.8"], ["1.2.3.4", None, "0.0.0.0"]]
ParserHelper.unpack_iocs(raw)
# → ["1.2.3.4", "5.6.7.8"]
```

### Validator

`Validator` guards against invalid inputs before they reach the API.

```python
from ciaops.utils import Validator
```

**`validate_collection_name(collection_name, method=None)`** — raises `InputException` for unknown, deprecated, or removed collection names. When `method="update"` also rejects search-only collections.

```python
Validator.validate_collection_name("apt/threat", method="update")   # OK
Validator.validate_collection_name("attacks/phishing")              # raises InputException: deprecated, use attacks/phishing_group
Validator.validate_collection_name("bp/phishing")                   # raises InputException: removed
```

**`validate_date_format(date, formats)`** — raises `InputException` if the date string does not match any of the provided format strings.

```python
Validator.validate_date_format("2024-01-15", ("%Y-%m-%d",))   # OK
Validator.validate_date_format("15/01/2024", ("%Y-%m-%d",))   # raises InputException
```

**`validate_ips_argument(ips)`** — normalizes and validates the `ips` argument for the scoring endpoint. Accepts a single IP string or a list of IP strings; raises `InvalidIpsParameter` on invalid input. Returns a normalized list.

```python
Validator.validate_ips_argument("8.8.8.8")             # → ["8.8.8.8"]
Validator.validate_ips_argument(["8.8.8.8", "1.1.1.1"]) # → ["8.8.8.8", "1.1.1.1"]
Validator.validate_ips_argument("8.8.8.8,1.1.1.1")     # raises InvalidIpsParameter
```

<br>

## Portal Links

`PORTAL_LINKS` and `generate_portal_link` map collection records to their Group-IB Portal URLs.

```python
from ciaops import PORTAL_LINKS, generate_portal_link
```

### Simple collections (single ID in URL)

Most collections use a plain prefix + record ID pattern:

```python
link = generate_portal_link('apt/threat', record_id='abc123')
# → "https://tap.group-ib.com/ta/last-threats?threat=abc123"

link = generate_portal_link('malware/config', record_id='def456')
# → "https://tap.group-ib.com/malware/configs?id=def456"

link = generate_portal_link('compromised/account_group', record_id='ghi789')
# → "https://tap.group-ib.com/cd/accounts?id=ghi789"
```

Returns `None` when `record_id` is empty or the collection has no portal mapping.

### Multi-part URL templates

Some collections require multiple fields from the feed record (e.g. `compromised/messenger`, `compromised/discord`). Pass all required field values via the `fields` dict:

```python
link = generate_portal_link(
    'compromised/messenger',
    fields={'chatStat.id': '1234', 'id': '5678'},
)
# → "https://tap.group-ib.com/ta/im?chatId=1234&msg=5678"

link = generate_portal_link(
    'compromised/discord',
    fields={'channel.id': 'ch99', 'id': 'msg42'},
)
# → "https://tap.group-ib.com/ta/im?collection=discord&chatId=ch99&msg=msg42"
```

Returns `None` if any required field is missing or empty.

### Custom prefix override

```python
link = generate_portal_link('my/collection', record_id='001', url_prefix='https://tap.group-ib.com/feed?id=')
# → "https://tap.group-ib.com/feed?id=001"
```

### Embedding portal links in parsed output

Use the `__concatenate` directive in your mapping template so that `ParserHelper` resolves the URL automatically during parsing:

```python
keys = {
    'id':         'id',
    'title':      'title',
    'portal_url': {'__concatenate': {'collection': 'apt/threat', 'dynamic': 'id'}},
}
result = ParserHelper.find_by_template(feed, keys)
# result['portal_url'] → "https://tap.group-ib.com/ta/last-threats?threat=<id>"
```

### Inspecting the full URL map

```python
from ciaops import PORTAL_LINKS

for collection, template in PORTAL_LINKS.items():
    print(collection, '->', template)
```

<br>

## Adapter utilities

`ConfigParser` and `FileHandler` are used by file-config based adapters such as the MISP adapter. They are not required for standard TI/DRP/ASM polling.

```python
from ciaops.adapters.misp_utils import ConfigParser, FileHandler
```

> **Note:** `ConfigParser` and `FileHandler` are also re-exported from `ciaops.utils` for backward compatibility, but the canonical import is from `ciaops.adapters.misp_utils`.

### ConfigParser

Parses YAML and JSON config files used by MISP-style adapters.

```python
cp = ConfigParser()

# Extract credentials from a YAML config dict as a dynamic Enum
creds = cp.get_creds(yaml_config)          # reads yaml_config["creds"]
creds = cp.get_creds(yaml_config, key="auth")  # custom key

creds.USERNAME.value   # "user@example.corp"
creds.API_KEY.value    # "abc123"
creds.API_URL.value    # "https://..."

# Get only enabled / disabled collections from YAML config
enabled  = ConfigParser.get_enabled_collections(yaml_config)   # ["apt/threat", ...]
disabled = ConfigParser.get_disabled_collections(yaml_config)

# Read a single collection's default_date
date = ConfigParser.get_collection_default_date(yaml_config, "apt/threat")
```

### FileHandler

A Borg-singleton file handler for reading and writing YAML and JSON config files. All instances share the same internal state, providing safe concurrent access via an `in_progress` flag.

```python
fh = FileHandler()

# Check file existence / emptiness
fh.is_exist("/path/to/config.yml")   # True / False
fh.is_empty("/path/to/config.yml")   # True / False

# Read configs
yaml_config = fh.read_yaml_config("/path/to/config.yml")
json_config = fh.read_json_config("/path/to/mapping.json")

# Persist updated collection state back to YAML
fh.save_collection_info(
    config="/path/to/config.yml",
    collection="apt/threat",
    seqUpdate=16172928022293,
    default_date="2024-01-15",
)

# Overwrite an entire config file
fh.save_data_to_yaml_config(data, "/path/to/config.yml")
fh.save_data_to_json_config(data, "/path/to/mapping.json")
```

<br>

## Examples

### Full version of program

```python
import logging
from ciaops import TIPoller
from ciaops.exception import InputException, ConnectionException, ParserException

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
...

try:
   poller = TIPoller(username=username, api_key=api_key, api_url=api_url)
   poller.set_proxies(proxy_protocol=PROXY_PROTOCOL,
                      proxy_port=PROXY_PORT,
                      proxy_ip=PROXY_ADDRESS,
                      proxy_password=PROXY_PASSWORD,
                      proxy_username=PROXY_USERNAME)
   poller.set_verify(True)
   for collection, keys in keys_config.items():
   poller.set_keys(collection, keys)
   for collection, state in update_generator_config.items():
        if state.get("sequpdate"):
        generator = poller.create_update_generator(collection_name=collection, sequpdate=state.get("sequpdate"))
    elif state.get("date_from"):
        sequpdate = poller.get_seq_update_dict(date=state.get('date_from'), collection_name=collection).get(collection)
        generator = poller.create_update_generator(collection_name=collection, sequpdate=sequpdate)
   else:
       continue
   for portion in generator:
       parsed_portion = portion.parse_portion()
           save_portion(parsed_portion)
       update_generator_config[collection]["sequpdate"] = portion.sequpdate

except InputException as e:
   logging.exception("Wrong input: {0}".format(e))
except ConnectionException as e:
   logging.exception("Something wrong with connection: {0}".format(e))
except ParserException as e:
   logging.exception("Exception occured during parsing: {0}".format(e))
finally:
   poller.close_session()
```

<br>

## API logic

To iterate over received portions from API response, you should follow one of the next iteration logic:

- **Result ID iteration** - based on `resultId` parameter, which was retrieved from previous response.
  Uses common collection name endpoint (`apt/threat`) which is added to the base URL >>> `/api/v2/apt/threat`.
- **Sequence update iteration** - based on `seqUpdate` parameter, which was retrieved from previous response.
  Uses updated endpoint (`/updated`) after collection name (`/apt/threat`) >>> `/api/v2/apt/threat/updated`.

To search IPs, domains, hashes, emails, etc., you should follow the next logic:

- **Search logic** -
  First you should reach `/api/v2/search` endpoint with any `q` parameter >>> `/api/v2/search?q=8.8.8.8`.
  In the output response you will receive collections, which contains the search result (`8.8.8.8`).
  Use _Sequence update iteration_ as a next step to retrieve all events.

To get the latest updates on each collection events you should follow the next logic:

- **Sequence update logic** -
  first you should reach `/api/v2/sequnce_list` endpoint with `date` and `collection` parameters (optional) >>> `/api/v2/search?date=2022-01-01&collection=apt/threat`.
  In the output response you will receive `seqUpdate` number, which you should use in the next request to collection `/updated` endpoint.
  Use _Sequence update iteration_ as a next step to retrieve all events.

<br>

### Sequence update logic

Most of the collections at the Threat Intelligence portal has `/updated` endpoint.
And this endpoint uses updated logic based on `seqUpdate` key field, which comes from API JSON response.

The `seqUpdate` key – is a time from Epoch converted to a big number (microseconds), using the next formula:

```text
UTC timestamp * 1000 * 1000.
```

_Note:_ Don't rely on this formula. Because of the rising amount of data it could be changed.
For that purpose `/api/v2/sequence_list` endpoint was created.
Use this endpoint to get required `seqUpdate` number.

#### API response

Each row in our database has its own unique sequence update number. So, we can get all the events one by one.
To check it you can explore JSON output and then explore each item in the `"items"` field.
So, each item contains a `seqUpdate` field. And the last element’s `seqUpdate` is put to the top level of JSON output.
You can use it to get the next portion of feeds.
Each collection has its own updated route like `/api/v2/apt/threat/updated`, so we can use the next output as an example.

```json
{
    "count": 1761,
    "items": [
        {"id": "fake286ca753feed3476649438e4e4488"...},
        {"id": "fake51d29357b22b80564a1d2f9fc8751"...},
        {
            "author": null,
            "companyId": [],
            "id": "fake4f16300296d20ef9b909dc0d354fb",
            ......,
            "indicators": [
                {
                    "dateFirstSeen": null,
                    "dateLastSeen": null,
                    "deleted": false,
                    "description": null,
                    "domain": "example.corp",
                    "id": "fakebe483bb82759fbee7038235e0f52d0",
                    .....
                }
            ],
            "indicatorsIds": [
                "fakebe483bb82759fbee7038235e0f52d0"
            ],
            "isPublished": true,
            "isTailored": false,
            "labels": [],
            "langs": [
                "en"
            ],
            "malwareList": [],
            ......,
            "seqUpdate": 16172928022293
        },
    ],
    "seqUpdate": 16172928022293
}
```

#### Iteration steps

To iterate over `/api/v2/apt/threat/updated` endpoint data, you need to collect this
field number (`"seqUpdate": 16172928022293`) right at the top level of the JSON response,
received from previous request or from `/sequnce_list` endpoint.

```console
curl -X 'GET' 'https://<base URL>/api/v2/sequnce_list'
```

Add gathered `seqUpdate` in the next request, using endpoint params.

```console
curl -X 'GET' 'https://<base URL>/api/v2/apt/threat/updated?seqUpdate=16172928022293'
```

In the received JSON output check the `"count": 1751`. -> \
Gather `seqUpdate` from last feed or at top level -> \
Put it in next request ->

```console
curl -X 'GET' 'https://<base URL>/api/v2/apt/threat/updated?seqUpdate=16172928536227'
```

In the received JSON output, check the `"count": 1741` -> \
Gather `seqUpdate` from last feed or at top level -> \
Repeat till the end.

#### Stop the iteration

The "stop word" in that logic is items `"count"` or `"items"` list length.
For the collection `apt/threat` in above example, the `limit` is set to 10 by default,
the other collections usually have 100 `limit`. The limit depends on the amount of data to not overload the JSON output.
For example, usually you receive a portion of 100 feeds (not 10) for the first iteration. ->
Then could be a portion of 23 feeds -> Then a portion of 0 feeds -> The end.

<br>

### Search logic

Search logic is used to find attribution to the search value in Threat Intelligence database.

#### Global search

To find events related to IP, domain, hash, email, etc., you should send request to the `/api/v2/search` endpoint
with any `q` parameter (`/api/v2/search?q=8.8.8.8`).
It will return a list of collections, which contains this searchable parameter.
As a next step we need to use _Sequence update iteration_ over all items in each collection.
You can specify the searchable type keyword to avoid side results by setting `q` parameter like `/api/v2/search?q=ip:8.8.8.8`.
The same can be done for domain, email, hash, etc (`/api/v2/search?q=domain:example.corp`, `/api/v2/search?q=email:example@example.corp`).

```json
[
    {
        "apiPath": "suspicious_ip/open_proxy",
        "label": "Suspicious IP :: Open Proxy",
        "link": "https://<base-url>/api/v2/suspicious_ip/open_proxy?q=ip:8.8.8.8",
        "count": 14,
        "time": 0.304644684,
        "detailedLinks": null
    },
    {
        "apiPath": "attacks/ddos",
        "label": "Attack :: DDoS",
        "link": "https://<base-url>/api/v2/attacks/ddos?q=ip:8.8.8.8",
        "count": 1490,
        "time": 0.389418291,
        "detailedLinks": null
    },
    {"apiPath": "attacks/deface"...},
    {"apiPath": "malware/config"...},
    {"apiPath": "suspicious_ip/scanner"...}
]

```

#### Iteration steps

On the first search step we receive information that collection `attacks/ddos` contains 1490 items (`"count": 1490`).
Let's extract all of them. First we need to send request to this collection with the `q` parameter (`?q=ip:8.8.8.8`).
Then we retrieve `"seqUpdate"` field right at the top level of the JSON response and use it in the next request (`"seqUpdate": 1673373011294`).

```json
{
  "count": 1490,
  "items": [
    {
      "body": null,
      "cnc": {"cnc": "http://example.corp/drv/"...},
      "company": null,
      "companyId": null,
      "dateBegin": null,
      "dateEnd": null,
      "dateReg": "2017-08-16T00:00:00+00:00",
      "evaluation": {},
      "favouriteForCompanies": [],
      "headers": [],
      "hideForCompanies": [],
      "id": "examplec58903baddc84b8c51eaef1f904374025d",
      "isFavourite": false,
      ...
    }
  ],
  ...,
  "seqUpdate": 1673373011294
}
```

So the next request should look like this `/api/v2/attacks/ddos/updated?q=ip:8.8.8.8&seqUpdate=1673373011294`.
We can also set the `limit` parameter in the requests, like `limit=500`.
Explore the example below.

```console
curl -X 'GET' 'https://<base URL>/api/v2/search?q=ip:8.8.8.8'
```

Add gathered `seqUpdate` in the next request, using endpoint params.

```console
curl -X 'GET' 'https://<base URL>/api/v2/apt/threat/updated?seqUpdate=1673373011294'
```

In the received JSON output check the `"count": 1390`. -> \
Gather `seqUpdate` from last feed or at top level -> \
Put it in next request ->

```console
curl -X 'GET' 'https://<base URL>/api/v2/apt/threat/updated?seqUpdate=1673375930599'
```

In the received JSON output, check the `"count": 1290` -> \
Gather `seqUpdate` from last feed or at top level -> \
Repeat till the end.

#### Stop the iteration

The "stop word" in that logic is items `"count"` or `"items"` list length.
For the collection `attacks/ddos` in above example, the `limit` is set to 100 by default,
the other collections it may differ. The limit depends on the amount of data to not overload the JSON output.
For example, usually you receive a portion of 100 feeds for the first iteration. ->
Then could be a portion of 23 feeds -> Then a portion of 0 feeds -> The end.

<br>

## Records limits

Default limit is 100 records per request. Due to different size of feeds there are different limits for getting data.

To change record limit in response add param `limit=500` to the request.
All limits for different collections can be found at Portal documentation.

```console
curl -X 'GET' 'https://<base URL>/api/v2/apt/threat/updated?limit=500&seqUpdate=16172928022293'
```

<br>

## Recommended TTL

TTL (Time To Live) is the maximum length of time an indicator or dataset (package) can exist. Calculated in days — during this period the platform guarantees that the data represents a valid, active IoC. Once the TTL expires the record should be considered stale and removed or re-evaluated. `None` means no expiry: the data does not have a defined lifetime and should be retained indefinitely.

| Endpoint                              | Recommended TTL (days) |
| ------------------------------------- | ---------------------- |
| **Threat Intelligence**               |                        |
| `apt/threat_actor/updated`            | 360                    |
| `hi/threat_actor/updated`             | 360                    |
| `apt/threat/updated`                  | 360                    |
| `hi/threat/updated`                   | 360                    |
| `hi/open_threats/updated`             | None                   |
| `hi/analytic/updated`                 | None                   |
| **Malware**                           |                        |
| `malware/config/updated`              | 30                     |
| `malware/malware/updated`             | None                   |
| `malware/signature/updated`           | None                   |
| `malware/yara/updated`                | None                   |
| `malware/cnc/updated`                 | 90                     |
| **Attacks**                           |                        |
| `attacks/phishing_kit/updated`        | 30                     |
| `attacks/phishing_group/updated`      | 30                     |
| `attacks/ddos/updated`                | 30                     |
| `attacks/deface/updated`              | 30                     |
| **Vulnerabilities**                   |                        |
| `osi/vulnerability/updated`           | 30                     |
| **Compromised**                       |                        |
| `compromised/messenger/updated`       | None                   |
| `compromised/discord/updated`         | None                   |
| `compromised/access/updated`          | 90                     |
| `compromised/account_group/updated`   | 90                     |
| `compromised/breached/updated`        | 90                     |
| `compromised/breacheddb/updated`      | 90                     |
| `compromised/reaper/updated`          | 90                     |
| `compromised/bank_card_group/updated` | 90                     |
| `compromised/masked_card/updated`     | 90                     |
| `compromised/spd/updated`             | 90                     |
| **OSI**                               |                        |
| `osi/public_leak/updated`             | 30                     |
| `osi/git_repository/updated`          | 30                     |
| **Suspicious IP**                     |                        |
| `suspicious_ip/tor_node/updated`      | 30                     |
| `suspicious_ip/open_proxy/updated`    | 15                     |
| `suspicious_ip/socks_proxy/updated`   | 2                      |
| `suspicious_ip/vpn/updated`           | 30                     |
| `suspicious_ip/scanner/updated`       | 15                     |
| **IoC**                               |                        |
| `ioc/common/updated`                  | 90                     |
| `ioc/primary/updated`                 | 90                     |

<br>

## Troubleshooting

### 401 response code

This code is return if you sent no credentials. Make sure that you send Authorization header and that you use Basic auth.

### 403 response code

There are several possible reasons of it:

- IP limitation. Make sure that you request from allowed IP address. You can find above how to set up your private IP list.
- API KEY issue. Make sure that your API KEY is active and valid. Try regeneration it as it was described above.
- No access to the feed. make sure that you have access to the requested feed. You can find available feed on Profile page -> Security and Access

### 504 response code or timeout

Try setting a smaller limit when requesting the API.

## FAQ

Have a question? Ask in the SD Ticket on our Portal or integration@group-ib.com
