nrp_cmd.async_client.base_client
Async client for NRP repositories.
1"""Async client for NRP repositories.""" 2# TODO: can not do from __future__ import annotations here 3# as it is not compatible with attrs trying to resolve the 4# type annotations in runtime 5 6from collections.abc import AsyncIterator 7from contextlib import AbstractAsyncContextManager 8from enum import Enum 9from typing import Any, Optional, Protocol, Self 10 11from yarl import URL 12 13from ..config import RepositoryConfig 14from ..types.info import RepositoryInfo 15from ..types.records import Record, RecordId, RecordList 16from .connection.limiter import Limiter 17 18 19class RecordStatus(Enum): 20 """Selector for records.""" 21 22 ALL = "all" 23 """All records""" 24 25 PUBLISHED = "published" 26 """Published records""" 27 28 DRAFT = "draft" 29 """Draft records""" 30 31 32class AsyncRecordsClient(Protocol): 33 """Client class for access to records.""" 34 35 def with_model(self, model: str) -> Self: 36 """Return a new client limited to the given model.""" 37 ... 38 39 @property 40 def published_records(self) -> Self: 41 """Return a new client limited to published records.""" 42 ... 43 44 @property 45 def draft_records(self) -> Self: 46 """Return a new client limited to draft records.""" 47 ... 48 49 async def create( 50 self, 51 data: dict[str, Any], 52 *, 53 model: str | None = None, 54 community: str | None = None, 55 workflow: str | None = None, 56 idempotent: bool = False, 57 files_enabled: bool = True, 58 ) -> Record: 59 """Create a new record in the repository. 60 61 :param data: the metadata of the record 62 :param community: community in which the record should be created 63 :param workflow: the workflow to use for the record, if not provided 64 the default workflow of the community is used 65 :param idempotent: if True, the operation is idempotent and can be retried on network errors. 66 Use only if you know that the operation is idempotent, for example that 67 you use PID generator that takes the persistent identifier from the data. 68 :return: the created record 69 """ 70 ... 71 72 async def read( 73 self, 74 record_id: RecordId, 75 *, 76 model: str | None = None, 77 status: RecordStatus | None = None, 78 **query: str 79 ) -> Record: 80 """Read a record from the repository. Please provide either record_id or record_url, not both. 81 82 :param record_id: the id of the record. Could be either pid or url 83 :param model: optional model of the record 84 :param query: extra arguments to read, repository specific 85 :return: the record 86 """ 87 ... 88 89 async def update( 90 self, 91 record: Record, 92 *, 93 verify_version: bool = True 94 ) -> Record: 95 """Update a record in the repository. 96 97 The record must have an id and optionally 98 an etag. If the etag is not provided, the record is updated without checking 99 the etag. If the etag is provided, the record is updated only if the etag matches 100 the current etag of the record in the repository. 101 102 An updated version, as stored in the repository, is returned. 103 104 :param record: record that will be stored to the server 105 :param verify_version: if set to true, verify that the record 106 on the server has not been modified in the meantime 107 """ 108 ... 109 110 async def patch( 111 self, 112 record_id: RecordId, 113 metadata: dict[str, Any], 114 *, 115 model: str | None = None, 116 status: RecordStatus | None = None, 117 etag: str | None = None, 118 ) -> Record: 119 """Path a record in the repository. 120 121 The record is deep-patched with the metadata, using the following algorithm: 122 * if a key in a dictionary is called 'replace:<key>', the whole content will be replaced. 123 * dictionaries: are merged, with primitive values from metadata overriding 124 values in the record. None in the metadata removes the key from the record 125 * arrays: values from the metadata will be added at the end. 126 127 :param record_id: identification of the record 128 :param metadata: dictionary containing the metadata 129 :param etag: if set, check that we are patching this etag 130 """ 131 ... 132 133 async def delete( 134 self, 135 record_id_or_record : RecordId | Record, 136 *, 137 etag: str | None = None, 138 status: RecordStatus | None = None, 139 ) -> None: 140 """Delete a record inside the repository. 141 142 :param record_id: identification of the record. If record_id is passed, you can 143 specify the etag as well 144 :param record: record downloaded from the repository 145 :param etag: if record_id is specified, delete the record only if the version matches 146 """ 147 ... 148 149 async def search( 150 self, 151 *, 152 q: Optional[str] = None, 153 page: Optional[int] = None, 154 size: Optional[int] = None, 155 model: str | None = None, 156 status: RecordStatus | None = None, 157 **facets: str, 158 ) -> RecordList: 159 """Search for records in the repository.""" 160 ... 161 162 async def next_page(self, *, record_list: RecordList) -> RecordList: 163 """Get the next page of records.""" 164 ... 165 166 async def previous_page(self, *, record_list: RecordList) -> RecordList: 167 """Get the previous page of records.""" 168 ... 169 170 async def scan( 171 self, 172 *, 173 q: Optional[str] = None, 174 model: str | None = None, 175 status: RecordStatus | None = None, 176 **facets: str, 177 ) -> AbstractAsyncContextManager[AsyncIterator[Record]]: 178 """Scan all the records in the repository. 179 180 Tries to return all matched records in a consistent manner 181 (using snapshots if they are available), but this behaviour is not guaranteed. 182 183 The implementation might rely on specific sorting, so you should not modify it 184 unless you know what you are doing. 185 186 Usage: 187 188 ``` 189 async with client.scan(...) as records: 190 async for record in records: 191 print(record) 192 ``` 193 """ 194 ... 195 196 197class AsyncRepositoryClient(Protocol): 198 """An abstract client for NRP repositories. 199 200 Usually, subclasses of this class are not instantiated directly in your code. 201 To get an instance of a repository, use the high-level method `get_async_client`: 202 203 ``` 204 my_client = await async_client(config?, url=url, refresh=False/True) 205 my_client = await async_client(config?, alias=alias, refresh=False/True) 206 ``` 207 208 and then use the instance. 209 """ 210 211 # region info endpoint 212 @classmethod 213 async def can_handle_repository(cls, url: URL | str, verify_tls: bool = True) -> URL | None: 214 """Return if this client can handle a repository that contains the passed URL. 215 216 This method can make an http request or use any means to check if the repository 217 at the URL can be handled by this client. 218 219 :param url: any url within the repository (root, record api, html, documentation 220 if running on the same host, ...) 221 :param verify_tls: whether to verify tls (should be switched on for production) 222 :return: API url of the server or None if this client can not handle the repository 223 """ 224 ... 225 226 @classmethod 227 async def from_configuration(cls, 228 config: RepositoryConfig, 229 refresh: bool = False, 230 limiter: Limiter | None = None, 231 extra_tokens: dict[URL, str] | None = None 232 ) -> Self: 233 """Create a client from the given configuration. 234 235 :param config: the configuration for the repository 236 :param refresh: refresh the configuration by calling get_repository_info 237 """ 238 ... 239 240 async def get_repository_info(self, refresh: bool = True) -> RepositoryInfo: 241 """Get information about the repository. 242 243 This call is cached inside the RepositoryConfig instance. 244 245 :param refresh: refresh the info from the server 246 """ 247 ... 248 249 # endregion 250 251 @property 252 def records(self) -> AsyncRecordsClient: 253 """Return client for accessing records.""" 254 ... 255 256 # region records endpoint 257 258 # endregion 259 260 # region files endpoint 261 262 263 264 # endregion 265 266 # region requests endpoint 267 268 # endregion
20class RecordStatus(Enum): 21 """Selector for records.""" 22 23 ALL = "all" 24 """All records""" 25 26 PUBLISHED = "published" 27 """Published records""" 28 29 DRAFT = "draft" 30 """Draft records"""
Selector for records.
33class AsyncRecordsClient(Protocol): 34 """Client class for access to records.""" 35 36 def with_model(self, model: str) -> Self: 37 """Return a new client limited to the given model.""" 38 ... 39 40 @property 41 def published_records(self) -> Self: 42 """Return a new client limited to published records.""" 43 ... 44 45 @property 46 def draft_records(self) -> Self: 47 """Return a new client limited to draft records.""" 48 ... 49 50 async def create( 51 self, 52 data: dict[str, Any], 53 *, 54 model: str | None = None, 55 community: str | None = None, 56 workflow: str | None = None, 57 idempotent: bool = False, 58 files_enabled: bool = True, 59 ) -> Record: 60 """Create a new record in the repository. 61 62 :param data: the metadata of the record 63 :param community: community in which the record should be created 64 :param workflow: the workflow to use for the record, if not provided 65 the default workflow of the community is used 66 :param idempotent: if True, the operation is idempotent and can be retried on network errors. 67 Use only if you know that the operation is idempotent, for example that 68 you use PID generator that takes the persistent identifier from the data. 69 :return: the created record 70 """ 71 ... 72 73 async def read( 74 self, 75 record_id: RecordId, 76 *, 77 model: str | None = None, 78 status: RecordStatus | None = None, 79 **query: str 80 ) -> Record: 81 """Read a record from the repository. Please provide either record_id or record_url, not both. 82 83 :param record_id: the id of the record. Could be either pid or url 84 :param model: optional model of the record 85 :param query: extra arguments to read, repository specific 86 :return: the record 87 """ 88 ... 89 90 async def update( 91 self, 92 record: Record, 93 *, 94 verify_version: bool = True 95 ) -> Record: 96 """Update a record in the repository. 97 98 The record must have an id and optionally 99 an etag. If the etag is not provided, the record is updated without checking 100 the etag. If the etag is provided, the record is updated only if the etag matches 101 the current etag of the record in the repository. 102 103 An updated version, as stored in the repository, is returned. 104 105 :param record: record that will be stored to the server 106 :param verify_version: if set to true, verify that the record 107 on the server has not been modified in the meantime 108 """ 109 ... 110 111 async def patch( 112 self, 113 record_id: RecordId, 114 metadata: dict[str, Any], 115 *, 116 model: str | None = None, 117 status: RecordStatus | None = None, 118 etag: str | None = None, 119 ) -> Record: 120 """Path a record in the repository. 121 122 The record is deep-patched with the metadata, using the following algorithm: 123 * if a key in a dictionary is called 'replace:<key>', the whole content will be replaced. 124 * dictionaries: are merged, with primitive values from metadata overriding 125 values in the record. None in the metadata removes the key from the record 126 * arrays: values from the metadata will be added at the end. 127 128 :param record_id: identification of the record 129 :param metadata: dictionary containing the metadata 130 :param etag: if set, check that we are patching this etag 131 """ 132 ... 133 134 async def delete( 135 self, 136 record_id_or_record : RecordId | Record, 137 *, 138 etag: str | None = None, 139 status: RecordStatus | None = None, 140 ) -> None: 141 """Delete a record inside the repository. 142 143 :param record_id: identification of the record. If record_id is passed, you can 144 specify the etag as well 145 :param record: record downloaded from the repository 146 :param etag: if record_id is specified, delete the record only if the version matches 147 """ 148 ... 149 150 async def search( 151 self, 152 *, 153 q: Optional[str] = None, 154 page: Optional[int] = None, 155 size: Optional[int] = None, 156 model: str | None = None, 157 status: RecordStatus | None = None, 158 **facets: str, 159 ) -> RecordList: 160 """Search for records in the repository.""" 161 ... 162 163 async def next_page(self, *, record_list: RecordList) -> RecordList: 164 """Get the next page of records.""" 165 ... 166 167 async def previous_page(self, *, record_list: RecordList) -> RecordList: 168 """Get the previous page of records.""" 169 ... 170 171 async def scan( 172 self, 173 *, 174 q: Optional[str] = None, 175 model: str | None = None, 176 status: RecordStatus | None = None, 177 **facets: str, 178 ) -> AbstractAsyncContextManager[AsyncIterator[Record]]: 179 """Scan all the records in the repository. 180 181 Tries to return all matched records in a consistent manner 182 (using snapshots if they are available), but this behaviour is not guaranteed. 183 184 The implementation might rely on specific sorting, so you should not modify it 185 unless you know what you are doing. 186 187 Usage: 188 189 ``` 190 async with client.scan(...) as records: 191 async for record in records: 192 print(record) 193 ``` 194 """ 195 ...
Client class for access to records.
1767def _no_init_or_replace_init(self, *args, **kwargs): 1768 cls = type(self) 1769 1770 if cls._is_protocol: 1771 raise TypeError('Protocols cannot be instantiated') 1772 1773 # Already using a custom `__init__`. No need to calculate correct 1774 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1775 if cls.__init__ is not _no_init_or_replace_init: 1776 return 1777 1778 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1779 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1780 # searches for a proper new `__init__` in the MRO. The new `__init__` 1781 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1782 # instantiation of the protocol subclass will thus use the new 1783 # `__init__` and no longer call `_no_init_or_replace_init`. 1784 for base in cls.__mro__: 1785 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1786 if init is not _no_init_or_replace_init: 1787 cls.__init__ = init 1788 break 1789 else: 1790 # should not happen 1791 cls.__init__ = object.__init__ 1792 1793 cls.__init__(self, *args, **kwargs)
36 def with_model(self, model: str) -> Self: 37 """Return a new client limited to the given model.""" 38 ...
Return a new client limited to the given model.
40 @property 41 def published_records(self) -> Self: 42 """Return a new client limited to published records.""" 43 ...
Return a new client limited to published records.
45 @property 46 def draft_records(self) -> Self: 47 """Return a new client limited to draft records.""" 48 ...
Return a new client limited to draft records.
50 async def create( 51 self, 52 data: dict[str, Any], 53 *, 54 model: str | None = None, 55 community: str | None = None, 56 workflow: str | None = None, 57 idempotent: bool = False, 58 files_enabled: bool = True, 59 ) -> Record: 60 """Create a new record in the repository. 61 62 :param data: the metadata of the record 63 :param community: community in which the record should be created 64 :param workflow: the workflow to use for the record, if not provided 65 the default workflow of the community is used 66 :param idempotent: if True, the operation is idempotent and can be retried on network errors. 67 Use only if you know that the operation is idempotent, for example that 68 you use PID generator that takes the persistent identifier from the data. 69 :return: the created record 70 """ 71 ...
Create a new record in the repository.
Parameters
- data: the metadata of the record
- community: community in which the record should be created
- workflow: the workflow to use for the record, if not provided the default workflow of the community is used
- idempotent: if True, the operation is idempotent and can be retried on network errors. Use only if you know that the operation is idempotent, for example that you use PID generator that takes the persistent identifier from the data.
Returns
the created record
73 async def read( 74 self, 75 record_id: RecordId, 76 *, 77 model: str | None = None, 78 status: RecordStatus | None = None, 79 **query: str 80 ) -> Record: 81 """Read a record from the repository. Please provide either record_id or record_url, not both. 82 83 :param record_id: the id of the record. Could be either pid or url 84 :param model: optional model of the record 85 :param query: extra arguments to read, repository specific 86 :return: the record 87 """ 88 ...
Read a record from the repository. Please provide either record_id or record_url, not both.
Parameters
- record_id: the id of the record. Could be either pid or url
- model: optional model of the record
- query: extra arguments to read, repository specific
Returns
the record
90 async def update( 91 self, 92 record: Record, 93 *, 94 verify_version: bool = True 95 ) -> Record: 96 """Update a record in the repository. 97 98 The record must have an id and optionally 99 an etag. If the etag is not provided, the record is updated without checking 100 the etag. If the etag is provided, the record is updated only if the etag matches 101 the current etag of the record in the repository. 102 103 An updated version, as stored in the repository, is returned. 104 105 :param record: record that will be stored to the server 106 :param verify_version: if set to true, verify that the record 107 on the server has not been modified in the meantime 108 """ 109 ...
Update a record in the repository.
The record must have an id and optionally an etag. If the etag is not provided, the record is updated without checking the etag. If the etag is provided, the record is updated only if the etag matches the current etag of the record in the repository.
An updated version, as stored in the repository, is returned.
Parameters
- record: record that will be stored to the server
- verify_version: if set to true, verify that the record on the server has not been modified in the meantime
111 async def patch( 112 self, 113 record_id: RecordId, 114 metadata: dict[str, Any], 115 *, 116 model: str | None = None, 117 status: RecordStatus | None = None, 118 etag: str | None = None, 119 ) -> Record: 120 """Path a record in the repository. 121 122 The record is deep-patched with the metadata, using the following algorithm: 123 * if a key in a dictionary is called 'replace:<key>', the whole content will be replaced. 124 * dictionaries: are merged, with primitive values from metadata overriding 125 values in the record. None in the metadata removes the key from the record 126 * arrays: values from the metadata will be added at the end. 127 128 :param record_id: identification of the record 129 :param metadata: dictionary containing the metadata 130 :param etag: if set, check that we are patching this etag 131 """ 132 ...
Path a record in the repository.
The record is deep-patched with the metadata, using the following algorithm:
- if a key in a dictionary is called 'replace:
', the whole content will be replaced. - dictionaries: are merged, with primitive values from metadata overriding values in the record. None in the metadata removes the key from the record
- arrays: values from the metadata will be added at the end.
Parameters
- record_id: identification of the record
- metadata: dictionary containing the metadata
- etag: if set, check that we are patching this etag
134 async def delete( 135 self, 136 record_id_or_record : RecordId | Record, 137 *, 138 etag: str | None = None, 139 status: RecordStatus | None = None, 140 ) -> None: 141 """Delete a record inside the repository. 142 143 :param record_id: identification of the record. If record_id is passed, you can 144 specify the etag as well 145 :param record: record downloaded from the repository 146 :param etag: if record_id is specified, delete the record only if the version matches 147 """ 148 ...
Delete a record inside the repository.
Parameters
- record_id: identification of the record. If record_id is passed, you can specify the etag as well
- record: record downloaded from the repository
- etag: if record_id is specified, delete the record only if the version matches
150 async def search( 151 self, 152 *, 153 q: Optional[str] = None, 154 page: Optional[int] = None, 155 size: Optional[int] = None, 156 model: str | None = None, 157 status: RecordStatus | None = None, 158 **facets: str, 159 ) -> RecordList: 160 """Search for records in the repository.""" 161 ...
Search for records in the repository.
163 async def next_page(self, *, record_list: RecordList) -> RecordList: 164 """Get the next page of records.""" 165 ...
Get the next page of records.
167 async def previous_page(self, *, record_list: RecordList) -> RecordList: 168 """Get the previous page of records.""" 169 ...
Get the previous page of records.
171 async def scan( 172 self, 173 *, 174 q: Optional[str] = None, 175 model: str | None = None, 176 status: RecordStatus | None = None, 177 **facets: str, 178 ) -> AbstractAsyncContextManager[AsyncIterator[Record]]: 179 """Scan all the records in the repository. 180 181 Tries to return all matched records in a consistent manner 182 (using snapshots if they are available), but this behaviour is not guaranteed. 183 184 The implementation might rely on specific sorting, so you should not modify it 185 unless you know what you are doing. 186 187 Usage: 188 189 ``` 190 async with client.scan(...) as records: 191 async for record in records: 192 print(record) 193 ``` 194 """ 195 ...
Scan all the records in the repository.
Tries to return all matched records in a consistent manner (using snapshots if they are available), but this behaviour is not guaranteed.
The implementation might rely on specific sorting, so you should not modify it unless you know what you are doing.
Usage:
async with client.scan(...) as records:
async for record in records:
print(record)
198class AsyncRepositoryClient(Protocol): 199 """An abstract client for NRP repositories. 200 201 Usually, subclasses of this class are not instantiated directly in your code. 202 To get an instance of a repository, use the high-level method `get_async_client`: 203 204 ``` 205 my_client = await async_client(config?, url=url, refresh=False/True) 206 my_client = await async_client(config?, alias=alias, refresh=False/True) 207 ``` 208 209 and then use the instance. 210 """ 211 212 # region info endpoint 213 @classmethod 214 async def can_handle_repository(cls, url: URL | str, verify_tls: bool = True) -> URL | None: 215 """Return if this client can handle a repository that contains the passed URL. 216 217 This method can make an http request or use any means to check if the repository 218 at the URL can be handled by this client. 219 220 :param url: any url within the repository (root, record api, html, documentation 221 if running on the same host, ...) 222 :param verify_tls: whether to verify tls (should be switched on for production) 223 :return: API url of the server or None if this client can not handle the repository 224 """ 225 ... 226 227 @classmethod 228 async def from_configuration(cls, 229 config: RepositoryConfig, 230 refresh: bool = False, 231 limiter: Limiter | None = None, 232 extra_tokens: dict[URL, str] | None = None 233 ) -> Self: 234 """Create a client from the given configuration. 235 236 :param config: the configuration for the repository 237 :param refresh: refresh the configuration by calling get_repository_info 238 """ 239 ... 240 241 async def get_repository_info(self, refresh: bool = True) -> RepositoryInfo: 242 """Get information about the repository. 243 244 This call is cached inside the RepositoryConfig instance. 245 246 :param refresh: refresh the info from the server 247 """ 248 ... 249 250 # endregion 251 252 @property 253 def records(self) -> AsyncRecordsClient: 254 """Return client for accessing records.""" 255 ... 256 257 # region records endpoint 258 259 # endregion 260 261 # region files endpoint 262 263 264 265 # endregion 266 267 # region requests endpoint 268 269 # endregion
An abstract client for NRP repositories.
Usually, subclasses of this class are not instantiated directly in your code.
To get an instance of a repository, use the high-level method get_async_client:
my_client = await async_client(config?, url=url, refresh=False/True)
my_client = await async_client(config?, alias=alias, refresh=False/True)
and then use the instance.
1767def _no_init_or_replace_init(self, *args, **kwargs): 1768 cls = type(self) 1769 1770 if cls._is_protocol: 1771 raise TypeError('Protocols cannot be instantiated') 1772 1773 # Already using a custom `__init__`. No need to calculate correct 1774 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1775 if cls.__init__ is not _no_init_or_replace_init: 1776 return 1777 1778 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1779 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1780 # searches for a proper new `__init__` in the MRO. The new `__init__` 1781 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1782 # instantiation of the protocol subclass will thus use the new 1783 # `__init__` and no longer call `_no_init_or_replace_init`. 1784 for base in cls.__mro__: 1785 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1786 if init is not _no_init_or_replace_init: 1787 cls.__init__ = init 1788 break 1789 else: 1790 # should not happen 1791 cls.__init__ = object.__init__ 1792 1793 cls.__init__(self, *args, **kwargs)
213 @classmethod 214 async def can_handle_repository(cls, url: URL | str, verify_tls: bool = True) -> URL | None: 215 """Return if this client can handle a repository that contains the passed URL. 216 217 This method can make an http request or use any means to check if the repository 218 at the URL can be handled by this client. 219 220 :param url: any url within the repository (root, record api, html, documentation 221 if running on the same host, ...) 222 :param verify_tls: whether to verify tls (should be switched on for production) 223 :return: API url of the server or None if this client can not handle the repository 224 """ 225 ...
Return if this client can handle a repository that contains the passed URL.
This method can make an http request or use any means to check if the repository at the URL can be handled by this client.
Parameters
- url: any url within the repository (root, record api, html, documentation if running on the same host, ...)
- verify_tls: whether to verify tls (should be switched on for production)
Returns
API url of the server or None if this client can not handle the repository
227 @classmethod 228 async def from_configuration(cls, 229 config: RepositoryConfig, 230 refresh: bool = False, 231 limiter: Limiter | None = None, 232 extra_tokens: dict[URL, str] | None = None 233 ) -> Self: 234 """Create a client from the given configuration. 235 236 :param config: the configuration for the repository 237 :param refresh: refresh the configuration by calling get_repository_info 238 """ 239 ...
Create a client from the given configuration.
Parameters
- config: the configuration for the repository
- refresh: refresh the configuration by calling get_repository_info
241 async def get_repository_info(self, refresh: bool = True) -> RepositoryInfo: 242 """Get information about the repository. 243 244 This call is cached inside the RepositoryConfig instance. 245 246 :param refresh: refresh the info from the server 247 """ 248 ...
Get information about the repository.
This call is cached inside the RepositoryConfig instance.
Parameters
- refresh: refresh the info from the server
252 @property 253 def records(self) -> AsyncRecordsClient: 254 """Return client for accessing records.""" 255 ...
Return client for accessing records.