nrp_cmd.async_client.connection
Asynchronous connection for the NRP client.
1# 2# Copyright (C) 2024 CESNET z.s.p.o. 3# 4# invenio-nrp is free software; you can redistribute it and/or 5# modify it under the terms of the MIT License; see LICENSE file for more 6# details. 7# 8"""Asynchronous connection for the NRP client.""" 9 10from .connection import AsyncConnection 11from .limiter import Limiter 12 13__all__ = ("AsyncConnection", "Limiter")
class
AsyncConnection:
100class AsyncConnection: 101 """Pre-configured asynchronous http connection.""" 102 103 def __init__( 104 self, 105 *, 106 limiter: Limiter | None = None, 107 tokens: dict[URL, str] | None = None, 108 verify_tls: bool = True, 109 retry_count: int = 5, 110 retry_after_seconds: int = 1, 111 ): 112 """Create a new connection with the given configuration.""" 113 self._limiter = limiter or Limiter(10) 114 self._verify_tls = verify_tls 115 self._retry_count = retry_count 116 self._retry_after_seconds = retry_after_seconds 117 118 _tokens: list[BearerTokenForHost] = [ 119 BearerTokenForHost(host_url=url, token=token) 120 for url, token in (tokens or {}).items() 121 if token 122 ] 123 self._auth = BearerAuthentication(_tokens) 124 125 @contextlib.asynccontextmanager 126 async def _client( 127 self, idempotent: bool = False 128 ) -> AsyncGenerator[HttpClient, None]: 129 """Create a new session with the repository and configure it with the token. 130 131 :return: A new http client 132 """ 133 """ 134 Create a new session with the repository and configure it with the token. 135 :return: A new http client 136 """ 137 138 connector = TCPConnector(verify_ssl=self._verify_tls) 139 async with ClientSession( 140 request_class=AuthenticatedClientRequest, 141 response_class=RepositoryResponse, 142 connector=connector, 143 ) as session: 144 retry_client = RetryClient( 145 client_session=session, 146 retry_options=ServerAssistedRetry( 147 attempts=self._retry_count, 148 start_timeout=self._retry_after_seconds, 149 ), 150 ) 151 yield retry_client 152 153 async def head( 154 self, 155 *, 156 url: URL, 157 use_get: bool = False, 158 **kwargs: Any, # noqa: ANN401 159 ) -> CIMultiDictProxy[str]: 160 """Perform a HEAD request to the repository. 161 162 :param url: the url of the request 163 :param idempotent: True if the request is idempotent, should be for HEAD requests 164 :param kwargs: any kwargs to pass to the aiohttp client 165 :return: None 166 167 :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx) 168 :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx) 169 :raises RepositoryCommunicationError: if the request fails due to network 170 """ 171 async def _head(response: ClientResponse) -> CIMultiDictProxy[str]: 172 return response.headers 173 174 if use_get: 175 return await self._retried("GET", url, _head, idempotent=True, 176 headers={"Range": "bytes=0-0"}) 177 else: 178 return await self._retried("HEAD", url, _head, idempotent=True) 179 180 async def get[T]( 181 self, 182 *, 183 url: URL, 184 result_class: type[T], 185 **kwargs: Any, # noqa: ANN401 186 ) -> T: 187 """Perform a GET request to the repository. 188 189 :param url: the url of the request 190 :param idempotent: True if the request is idempotent, should be for GET requests 191 :param result_class: successful response will be parsed to this class 192 :param kwargs: any kwargs to pass to the aiohttp client 193 :return: the parsed result 194 195 :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx) 196 :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx) 197 :raises RepositoryCommunicationError: if the request fails due to network error 198 """ 199 return await self._retried( 200 "GET", 201 url, 202 partial( 203 self._get_call_result, 204 result_class=result_class, 205 ), 206 idempotent=True, 207 **kwargs 208 ) 209 210 async def post[T]( 211 self, 212 *, 213 url: URL, 214 json: dict[str, Any] | list[Any] | None = None, 215 data: bytes | None = None, 216 idempotent: bool = False, 217 result_class: type[T], 218 **kwargs: Any, # noqa: ANN401 219 ) -> T: 220 """Perform a POST request to the repository. 221 222 :param url: the url of the request 223 :param json: the json payload of the request (use exactly one of json or data) 224 :param data: the data payload of the request 225 :param idempotent: True if the request is idempotent, normally should be False 226 :param result_class: successful response will be parsed to this class 227 :param kwargs: any kwargs to pass to the aiohttp client 228 :return: the parsed result 229 230 :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx) 231 :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx) 232 :raises RepositoryCommunicationError: if the request fails due to network 233 """ 234 assert ( 235 json is not None or data is not None 236 ), "Either json or data must be provided" 237 238 return await self._retried( 239 "POST", 240 url, 241 partial( 242 self._get_call_result, 243 result_class=result_class, 244 ), 245 idempotent=idempotent, 246 json=json, 247 data=data, 248 **kwargs 249 ) 250 251 async def put[T]( 252 self, 253 *, 254 url: URL, 255 json: dict[str, Any] | list[Any] | None = None, 256 data: bytes | None = None, 257 result_class: type[T], 258 **kwargs: Any, # noqa: ANN401 259 ) -> T: 260 """Perform a PUT request to the repository. 261 262 :param url: the url of the request 263 :param json: the json payload of the request (use exactly one of json or data) 264 :param data: the data payload of the request 265 :param result_class: successful response will be parsed to this class 266 :param kwargs: any kwargs to pass to the aiohttp client 267 :return: the parsed result 268 269 :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx) 270 :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx) 271 :raises RepositoryCommunicationError: if the request fails due to network 272 """ 273 assert ( 274 json is not None or data is not None 275 ), "Either json or data must be provided" 276 277 return await self._retried( 278 "PUT", 279 url, 280 partial( 281 self._get_call_result, 282 result_class=result_class, 283 ), 284 idempotent=True, 285 json=json, 286 data=data, 287 **kwargs, 288 ) 289 290 async def put_stream( 291 self, 292 *, 293 url: URL, 294 source: DataSource, 295 open_kwargs: dict[str, Any] | None = None, 296 **kwargs: Any, # noqa: ANN401 297 ) -> ClientResponse: 298 """Perform a PUT request to the repository with a file. 299 300 :param url: the url of the request 301 :param file: the file to send 302 :param kwargs: any kwargs to pass to the aiohttp client 303 :return: the response (not parsed) 304 305 :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx) 306 :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx) 307 :raises RepositoryCommunicationError: if the request fails due to network 308 """ 309 310 async def _put(response: ClientResponse) -> ClientResponse: 311 return response 312 313 return await self._retried( 314 "PUT", 315 url, 316 _put, 317 idempotent=True, 318 data=partial(source.open, **(open_kwargs or {})), 319 **kwargs 320 ) 321 322 async def get_stream( 323 self, 324 *, 325 url: URL, 326 sink: DataSink, 327 offset: int = 0, 328 size: int | None = None, 329 **kwargs: Any, # noqa: ANN401 330 ) -> None: 331 """Perform a GET request to the repository and write the response to a sink. 332 333 :param url: the url of the request 334 :param kwargs: any kwargs to pass to the aiohttp client 335 :return: the parsed result 336 337 :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx) 338 :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx) 339 :raises RepositoryCommunicationError: if the request fails due to network error 340 """ 341 342 async def _copy_stream(response: ClientResponse) -> None: 343 chunk = await sink.open_chunk(offset=offset) 344 try: 345 async for data in response.content.iter_any(): 346 await chunk.write(data) 347 finally: 348 await chunk.close() 349 350 if size is not None: 351 range_header = f"bytes={offset}-{offset + size - 1}" 352 else: 353 range_header = f"bytes={offset}-" 354 355 await self._retried( 356 "GET", 357 url, 358 _copy_stream, 359 idempotent=True, 360 headers={"Range": range_header}, 361 **kwargs 362 ) 363 364 async def delete( 365 self, 366 *, 367 url: URL, 368 idempotent: bool = False, 369 **kwargs: Any, # noqa: ANN401 370 ) -> None: 371 """Perform a DELETE request to the repository. 372 373 :param url: the url of the request 374 :param idempotent: True if the request is idempotent, normally should be False 375 :param kwargs: any kwargs to pass to the aiohttp client 376 :return: None 377 378 :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx) 379 :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx) 380 :raises RepositoryCommunicationError: if the request fails due to network 381 """ 382 return await self._retried( 383 "DELETE", 384 url, 385 None, 386 idempotent=idempotent, 387 **kwargs 388 ) 389 390 @overload 391 async def _get_call_result[T]( 392 self, 393 response: ClientResponse, 394 result_class: type[T], 395 ) -> T: ... 396 397 @overload 398 async def _get_call_result( 399 self, 400 response: ClientResponse, 401 result_class: None, 402 ) -> None: ... 403 404 async def _get_call_result[T]( 405 self, 406 response: ClientResponse, 407 result_class: type[T] | None, 408 ) -> T | None: 409 """Get the result from the response. 410 411 :param response: the aiohttp response 412 :param result_class: the class to parse the response to 413 :return: the parsed result 414 415 :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx) 416 :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx) 417 :raises RepositoryCommunicationError: if the request fails due to network 418 """ 419 await response.raise_for_invenio_status() # type: ignore 420 if response.status == 204: 421 assert result_class is None 422 return None 423 json_payload = await response.read() 424 assert result_class is not None 425 if issubclass(result_class, ClientResponse): 426 return cast(T, response) # mypy can not get it 427 elif issubclass(result_class, str): 428 return cast(T, json_payload.decode('utf-8')) # mypy can not get it 429 elif issubclass(result_class, dict): 430 return _json.loads(json_payload) 431 etag = remove_quotes(response.headers.get("ETag")) 432 return deserialize_rest_response( 433 self, communication_log, json_payload, result_class, etag 434 ) 435 436 @overload 437 async def _retried[T]( 438 self, 439 method: str, 440 url: URL, 441 callback: Callable[[ClientResponse], Awaitable[T]], 442 idempotent: bool, 443 **kwargs: Any, # noqa: ANN401 444 ) -> T: ... 445 446 @overload 447 async def _retried( 448 self, 449 method: str, 450 url: URL, 451 callback: None, 452 idempotent: bool, 453 **kwargs: Any, # noqa: ANN401 454 ) -> None: ... 455 456 async def _retried[T]( 457 self, 458 method: str, 459 url: URL, 460 callback: Callable[[ClientResponse], Awaitable[T]] | None, 461 idempotent: bool, 462 **kwargs: Any, # noqa: ANN401 463 ) -> T | None: 464 """Log the start of a request and retry it if necessary.""" 465 json = kwargs.get("json") 466 if json is not None and callable(json): 467 json = json() 468 kwargs["json"] = json 469 470 data = kwargs.get("data") 471 472 if communication_log.isEnabledFor(logging.INFO): 473 communication_log.info("%s %s", method.upper(), url) 474 if json is not None: 475 communication_log.info("%s", _json.dumps(json)) 476 if data is not None: 477 communication_log.info("(stream)") 478 479 for attempt in try_until_success( 480 self._retry_count if idempotent else 1 481 ): 482 actual_data = None 483 if data is not None and callable(data): 484 actual_data = await data() 485 kwargs["data"] = actual_data 486 try: 487 async with ( 488 attempt, 489 self._limiter, 490 self._client(idempotent=True) as client, 491 _cast_error(), 492 client.request(method, url, auth=self._auth, **kwargs) as response, 493 ): 494 await response.raise_for_invenio_status() # type: ignore 495 if callback is not None: 496 return await callback(response) 497 return None 498 finally: 499 if actual_data is not None and hasattr(actual_data, "close"): 500 await actual_data.close() 501 502 raise Exception("unreachable") 503 504 async def download_file( 505 self, url: URL, sink: DataSink, 506 parts: int | None =None, 507 part_size : int | None = None) -> None: 508 509 try: 510 headers = await self.head(url=url) 511 except RepositoryClientError: 512 # The file is not available for HEAD. This is the case for S3 files 513 # where the file is a pre-signed request. We'll try to download the headers 514 # with a GET request with a range header containing only the first byte. 515 headers = await self.head(url=url, use_get=True) 516 517 size = 0 518 location = URL(headers.get('Location', url)) 519 520 if "Content-Length" in headers: 521 size = int(headers["Content-Length"]) 522 await sink.allocate(size) 523 524 if ( 525 size 526 and size > MINIMAL_DOWNLOAD_PART_SIZE 527 and any(x == "bytes" for x in headers.getall("Accept-Ranges")) 528 ): 529 await self._download_multipart(location, sink, size, parts, part_size) 530 else: 531 await self._download_single(location, sink) 532 533 async def _download_single(self, url: URL, sink: DataSink) -> None: 534 await self.get_stream(url=url, sink=sink, offset=0) 535 536 async def _download_multipart( 537 self, url: URL, sink: DataSink, size: int, parts: int | None = None, part_size: int | None = None 538 ) -> None: 539 adjusted_parts, adjusted_part_size = adjust_download_multipart_params(size, parts, part_size) 540 541 async with asyncio.TaskGroup() as tg: 542 for i in range(adjusted_parts): 543 start = i * adjusted_part_size 544 size = min((i + 1) * adjusted_part_size, size) - start 545 tg.create_task( 546 self.get_stream( 547 url=url, sink=sink, offset=start, size=size 548 ) 549 )
Pre-configured asynchronous http connection.
AsyncConnection( *, limiter: Limiter | None = None, tokens: dict[yarl.URL, str] | None = None, verify_tls: bool = True, retry_count: int = 5, retry_after_seconds: int = 1)
103 def __init__( 104 self, 105 *, 106 limiter: Limiter | None = None, 107 tokens: dict[URL, str] | None = None, 108 verify_tls: bool = True, 109 retry_count: int = 5, 110 retry_after_seconds: int = 1, 111 ): 112 """Create a new connection with the given configuration.""" 113 self._limiter = limiter or Limiter(10) 114 self._verify_tls = verify_tls 115 self._retry_count = retry_count 116 self._retry_after_seconds = retry_after_seconds 117 118 _tokens: list[BearerTokenForHost] = [ 119 BearerTokenForHost(host_url=url, token=token) 120 for url, token in (tokens or {}).items() 121 if token 122 ] 123 self._auth = BearerAuthentication(_tokens)
Create a new connection with the given configuration.
async def
head( self, *, url: yarl.URL, use_get: bool = False, **kwargs: Any) -> multidict._multidict.CIMultiDictProxy[str]:
153 async def head( 154 self, 155 *, 156 url: URL, 157 use_get: bool = False, 158 **kwargs: Any, # noqa: ANN401 159 ) -> CIMultiDictProxy[str]: 160 """Perform a HEAD request to the repository. 161 162 :param url: the url of the request 163 :param idempotent: True if the request is idempotent, should be for HEAD requests 164 :param kwargs: any kwargs to pass to the aiohttp client 165 :return: None 166 167 :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx) 168 :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx) 169 :raises RepositoryCommunicationError: if the request fails due to network 170 """ 171 async def _head(response: ClientResponse) -> CIMultiDictProxy[str]: 172 return response.headers 173 174 if use_get: 175 return await self._retried("GET", url, _head, idempotent=True, 176 headers={"Range": "bytes=0-0"}) 177 else: 178 return await self._retried("HEAD", url, _head, idempotent=True)
Perform a HEAD request to the repository.
Parameters
- url: the url of the request
- idempotent: True if the request is idempotent, should be for HEAD requests
- kwargs: any kwargs to pass to the aiohttp client
Returns
None
Raises
- RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
- RepositoryServerError: if the request fails due to server error (HTTP 5xx)
- RepositoryCommunicationError: if the request fails due to network
async def
get(self, *, url: yarl.URL, result_class: type[T], **kwargs: Any) -> T:
180 async def get[T]( 181 self, 182 *, 183 url: URL, 184 result_class: type[T], 185 **kwargs: Any, # noqa: ANN401 186 ) -> T: 187 """Perform a GET request to the repository. 188 189 :param url: the url of the request 190 :param idempotent: True if the request is idempotent, should be for GET requests 191 :param result_class: successful response will be parsed to this class 192 :param kwargs: any kwargs to pass to the aiohttp client 193 :return: the parsed result 194 195 :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx) 196 :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx) 197 :raises RepositoryCommunicationError: if the request fails due to network error 198 """ 199 return await self._retried( 200 "GET", 201 url, 202 partial( 203 self._get_call_result, 204 result_class=result_class, 205 ), 206 idempotent=True, 207 **kwargs 208 )
Perform a GET request to the repository.
Parameters
- url: the url of the request
- idempotent: True if the request is idempotent, should be for GET requests
- result_class: successful response will be parsed to this class
- kwargs: any kwargs to pass to the aiohttp client
Returns
the parsed result
Raises
- RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
- RepositoryServerError: if the request fails due to server error (HTTP 5xx)
- RepositoryCommunicationError: if the request fails due to network error
async def
post( self, *, url: yarl.URL, json: dict[str, typing.Any] | list[typing.Any] | None = None, data: bytes | None = None, idempotent: bool = False, result_class: type[T], **kwargs: Any) -> T:
210 async def post[T]( 211 self, 212 *, 213 url: URL, 214 json: dict[str, Any] | list[Any] | None = None, 215 data: bytes | None = None, 216 idempotent: bool = False, 217 result_class: type[T], 218 **kwargs: Any, # noqa: ANN401 219 ) -> T: 220 """Perform a POST request to the repository. 221 222 :param url: the url of the request 223 :param json: the json payload of the request (use exactly one of json or data) 224 :param data: the data payload of the request 225 :param idempotent: True if the request is idempotent, normally should be False 226 :param result_class: successful response will be parsed to this class 227 :param kwargs: any kwargs to pass to the aiohttp client 228 :return: the parsed result 229 230 :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx) 231 :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx) 232 :raises RepositoryCommunicationError: if the request fails due to network 233 """ 234 assert ( 235 json is not None or data is not None 236 ), "Either json or data must be provided" 237 238 return await self._retried( 239 "POST", 240 url, 241 partial( 242 self._get_call_result, 243 result_class=result_class, 244 ), 245 idempotent=idempotent, 246 json=json, 247 data=data, 248 **kwargs 249 )
Perform a POST request to the repository.
Parameters
- url: the url of the request
- json: the json payload of the request (use exactly one of json or data)
- data: the data payload of the request
- idempotent: True if the request is idempotent, normally should be False
- result_class: successful response will be parsed to this class
- kwargs: any kwargs to pass to the aiohttp client
Returns
the parsed result
Raises
- RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
- RepositoryServerError: if the request fails due to server error (HTTP 5xx)
- RepositoryCommunicationError: if the request fails due to network
async def
put( self, *, url: yarl.URL, json: dict[str, typing.Any] | list[typing.Any] | None = None, data: bytes | None = None, result_class: type[T], **kwargs: Any) -> T:
251 async def put[T]( 252 self, 253 *, 254 url: URL, 255 json: dict[str, Any] | list[Any] | None = None, 256 data: bytes | None = None, 257 result_class: type[T], 258 **kwargs: Any, # noqa: ANN401 259 ) -> T: 260 """Perform a PUT request to the repository. 261 262 :param url: the url of the request 263 :param json: the json payload of the request (use exactly one of json or data) 264 :param data: the data payload of the request 265 :param result_class: successful response will be parsed to this class 266 :param kwargs: any kwargs to pass to the aiohttp client 267 :return: the parsed result 268 269 :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx) 270 :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx) 271 :raises RepositoryCommunicationError: if the request fails due to network 272 """ 273 assert ( 274 json is not None or data is not None 275 ), "Either json or data must be provided" 276 277 return await self._retried( 278 "PUT", 279 url, 280 partial( 281 self._get_call_result, 282 result_class=result_class, 283 ), 284 idempotent=True, 285 json=json, 286 data=data, 287 **kwargs, 288 )
Perform a PUT request to the repository.
Parameters
- url: the url of the request
- json: the json payload of the request (use exactly one of json or data)
- data: the data payload of the request
- result_class: successful response will be parsed to this class
- kwargs: any kwargs to pass to the aiohttp client
Returns
the parsed result
Raises
- RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
- RepositoryServerError: if the request fails due to server error (HTTP 5xx)
- RepositoryCommunicationError: if the request fails due to network
async def
put_stream( self, *, url: yarl.URL, source: nrp_cmd.async_client.streams.DataSource, open_kwargs: dict[str, typing.Any] | None = None, **kwargs: Any) -> aiohttp.client_reqrep.ClientResponse:
290 async def put_stream( 291 self, 292 *, 293 url: URL, 294 source: DataSource, 295 open_kwargs: dict[str, Any] | None = None, 296 **kwargs: Any, # noqa: ANN401 297 ) -> ClientResponse: 298 """Perform a PUT request to the repository with a file. 299 300 :param url: the url of the request 301 :param file: the file to send 302 :param kwargs: any kwargs to pass to the aiohttp client 303 :return: the response (not parsed) 304 305 :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx) 306 :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx) 307 :raises RepositoryCommunicationError: if the request fails due to network 308 """ 309 310 async def _put(response: ClientResponse) -> ClientResponse: 311 return response 312 313 return await self._retried( 314 "PUT", 315 url, 316 _put, 317 idempotent=True, 318 data=partial(source.open, **(open_kwargs or {})), 319 **kwargs 320 )
Perform a PUT request to the repository with a file.
Parameters
- url: the url of the request
- file: the file to send
- kwargs: any kwargs to pass to the aiohttp client
Returns
the response (not parsed)
Raises
- RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
- RepositoryServerError: if the request fails due to server error (HTTP 5xx)
- RepositoryCommunicationError: if the request fails due to network
async def
get_stream( self, *, url: yarl.URL, sink: nrp_cmd.async_client.streams.DataSink, offset: int = 0, size: int | None = None, **kwargs: Any) -> None:
322 async def get_stream( 323 self, 324 *, 325 url: URL, 326 sink: DataSink, 327 offset: int = 0, 328 size: int | None = None, 329 **kwargs: Any, # noqa: ANN401 330 ) -> None: 331 """Perform a GET request to the repository and write the response to a sink. 332 333 :param url: the url of the request 334 :param kwargs: any kwargs to pass to the aiohttp client 335 :return: the parsed result 336 337 :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx) 338 :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx) 339 :raises RepositoryCommunicationError: if the request fails due to network error 340 """ 341 342 async def _copy_stream(response: ClientResponse) -> None: 343 chunk = await sink.open_chunk(offset=offset) 344 try: 345 async for data in response.content.iter_any(): 346 await chunk.write(data) 347 finally: 348 await chunk.close() 349 350 if size is not None: 351 range_header = f"bytes={offset}-{offset + size - 1}" 352 else: 353 range_header = f"bytes={offset}-" 354 355 await self._retried( 356 "GET", 357 url, 358 _copy_stream, 359 idempotent=True, 360 headers={"Range": range_header}, 361 **kwargs 362 )
Perform a GET request to the repository and write the response to a sink.
Parameters
- url: the url of the request
- kwargs: any kwargs to pass to the aiohttp client
Returns
the parsed result
Raises
- RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
- RepositoryServerError: if the request fails due to server error (HTTP 5xx)
- RepositoryCommunicationError: if the request fails due to network error
async def
delete(self, *, url: yarl.URL, idempotent: bool = False, **kwargs: Any) -> None:
364 async def delete( 365 self, 366 *, 367 url: URL, 368 idempotent: bool = False, 369 **kwargs: Any, # noqa: ANN401 370 ) -> None: 371 """Perform a DELETE request to the repository. 372 373 :param url: the url of the request 374 :param idempotent: True if the request is idempotent, normally should be False 375 :param kwargs: any kwargs to pass to the aiohttp client 376 :return: None 377 378 :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx) 379 :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx) 380 :raises RepositoryCommunicationError: if the request fails due to network 381 """ 382 return await self._retried( 383 "DELETE", 384 url, 385 None, 386 idempotent=idempotent, 387 **kwargs 388 )
Perform a DELETE request to the repository.
Parameters
- url: the url of the request
- idempotent: True if the request is idempotent, normally should be False
- kwargs: any kwargs to pass to the aiohttp client
Returns
None
Raises
- RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
- RepositoryServerError: if the request fails due to server error (HTTP 5xx)
- RepositoryCommunicationError: if the request fails due to network
async def
download_file( self, url: yarl.URL, sink: nrp_cmd.async_client.streams.DataSink, parts: int | None = None, part_size: int | None = None) -> None:
504 async def download_file( 505 self, url: URL, sink: DataSink, 506 parts: int | None =None, 507 part_size : int | None = None) -> None: 508 509 try: 510 headers = await self.head(url=url) 511 except RepositoryClientError: 512 # The file is not available for HEAD. This is the case for S3 files 513 # where the file is a pre-signed request. We'll try to download the headers 514 # with a GET request with a range header containing only the first byte. 515 headers = await self.head(url=url, use_get=True) 516 517 size = 0 518 location = URL(headers.get('Location', url)) 519 520 if "Content-Length" in headers: 521 size = int(headers["Content-Length"]) 522 await sink.allocate(size) 523 524 if ( 525 size 526 and size > MINIMAL_DOWNLOAD_PART_SIZE 527 and any(x == "bytes" for x in headers.getall("Accept-Ranges")) 528 ): 529 await self._download_multipart(location, sink, size, parts, part_size) 530 else: 531 await self._download_single(location, sink)
class
Limiter(asyncio.locks.Semaphore):
14class Limiter(asyncio.Semaphore): 15 """A class to limit the number of simultaneous connections.""" 16 17 def __init__(self, capacity: int): 18 """Initialize the limiter. 19 20 :param capacity: the number of simultaneous connections 21 """ 22 self.capacity = capacity 23 super().__init__(capacity) 24 25 @property 26 def free(self) -> int: 27 """The number of free slots. 28 29 :return: the number of remaining connections 30 """ 31 return self._value
A class to limit the number of simultaneous connections.
Limiter(capacity: int)
17 def __init__(self, capacity: int): 18 """Initialize the limiter. 19 20 :param capacity: the number of simultaneous connections 21 """ 22 self.capacity = capacity 23 super().__init__(capacity)
Initialize the limiter.
Parameters
- capacity: the number of simultaneous connections