nrp_cmd.async_client.connection

Asynchronous connection for the NRP client.

 1#
 2# Copyright (C) 2024 CESNET z.s.p.o.
 3#
 4# invenio-nrp is free software; you can redistribute it and/or
 5# modify it under the terms of the MIT License; see LICENSE file for more
 6# details.
 7#
 8"""Asynchronous connection for the NRP client."""
 9
10from .connection import AsyncConnection
11from .limiter import Limiter
12
13__all__ = ("AsyncConnection", "Limiter")
class AsyncConnection:
100class AsyncConnection:
101    """Pre-configured asynchronous http connection."""
102
103    def __init__(
104        self,
105        *,
106        limiter: Limiter | None = None,
107        tokens: dict[URL, str] | None = None,
108        verify_tls: bool = True,
109        retry_count: int = 5,
110        retry_after_seconds: int = 1,
111    ):
112        """Create a new connection with the given configuration."""
113        self._limiter = limiter or Limiter(10)
114        self._verify_tls = verify_tls
115        self._retry_count = retry_count
116        self._retry_after_seconds = retry_after_seconds
117        
118        _tokens: list[BearerTokenForHost] = [
119            BearerTokenForHost(host_url=url, token=token)
120            for url, token in (tokens or {}).items()
121            if token
122        ]
123        self._auth = BearerAuthentication(_tokens)
124
125    @contextlib.asynccontextmanager
126    async def _client(
127        self, idempotent: bool = False
128    ) -> AsyncGenerator[HttpClient, None]:
129        """Create a new session with the repository and configure it with the token.
130
131        :return: A new http client
132        """
133        """
134        Create a new session with the repository and configure it with the token.
135        :return: A new http client
136        """
137
138        connector = TCPConnector(verify_ssl=self._verify_tls)
139        async with ClientSession(
140            request_class=AuthenticatedClientRequest,
141            response_class=RepositoryResponse,
142            connector=connector,
143        ) as session:
144            retry_client = RetryClient(
145                client_session=session,
146                retry_options=ServerAssistedRetry(
147                    attempts=self._retry_count,
148                    start_timeout=self._retry_after_seconds,
149                ),
150            )
151            yield retry_client
152
153    async def head(
154        self,
155        *,
156        url: URL,
157        use_get: bool = False,
158        **kwargs: Any,  # noqa: ANN401
159    ) -> CIMultiDictProxy[str]:
160        """Perform a HEAD request to the repository.
161
162        :param url:                 the url of the request
163        :param idempotent:          True if the request is idempotent, should be for HEAD requests
164        :param kwargs:              any kwargs to pass to the aiohttp client
165        :return:                    None
166
167        :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
168        :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx)
169        :raises RepositoryCommunicationError: if the request fails due to network
170        """
171        async def _head(response: ClientResponse) -> CIMultiDictProxy[str]:
172            return response.headers
173
174        if use_get:
175            return await self._retried("GET", url, _head, idempotent=True,
176                                       headers={"Range": "bytes=0-0"})
177        else:
178            return await self._retried("HEAD", url, _head, idempotent=True)
179
180    async def get[T](
181        self,
182        *,
183        url: URL,
184        result_class: type[T],
185        **kwargs: Any,  # noqa: ANN401
186    ) -> T:
187        """Perform a GET request to the repository.
188
189        :param url:                 the url of the request
190        :param idempotent:          True if the request is idempotent, should be for GET requests
191        :param result_class:        successful response will be parsed to this class
192        :param kwargs:              any kwargs to pass to the aiohttp client
193        :return:                    the parsed result
194
195        :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
196        :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx)
197        :raises RepositoryCommunicationError: if the request fails due to network error
198        """
199        return await self._retried(
200            "GET",
201            url,
202            partial(
203                self._get_call_result,
204                result_class=result_class,
205            ),
206            idempotent=True,
207            **kwargs
208        )
209
210    async def post[T](
211        self,
212        *,
213        url: URL,
214        json: dict[str, Any] | list[Any] | None = None,
215        data: bytes | None = None,
216        idempotent: bool = False,
217        result_class: type[T],
218        **kwargs: Any,  # noqa: ANN401
219    ) -> T:
220        """Perform a POST request to the repository.
221
222        :param url:                 the url of the request
223        :param json:                the json payload of the request (use exactly one of json or data)
224        :param data:                the data payload of the request
225        :param idempotent:          True if the request is idempotent, normally should be False
226        :param result_class:        successful response will be parsed to this class
227        :param kwargs:              any kwargs to pass to the aiohttp client
228        :return:                    the parsed result
229
230        :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
231        :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx)
232        :raises RepositoryCommunicationError: if the request fails due to network
233        """
234        assert (
235            json is not None or data is not None
236        ), "Either json or data must be provided"
237
238        return await self._retried(
239            "POST",
240            url,
241            partial(
242                self._get_call_result,
243                result_class=result_class,
244            ),
245            idempotent=idempotent,
246            json=json,
247            data=data,
248            **kwargs
249        )
250
251    async def put[T](
252        self,
253        *,
254        url: URL,
255        json: dict[str, Any] | list[Any] | None = None,
256        data: bytes | None = None,
257        result_class: type[T],
258        **kwargs: Any,  # noqa: ANN401
259    ) -> T:
260        """Perform a PUT request to the repository.
261
262        :param url:                     the url of the request
263        :param json:                    the json payload of the request (use exactly one of json or data)
264        :param data:                    the data payload of the request
265        :param result_class:            successful response will be parsed to this class
266        :param kwargs:                  any kwargs to pass to the aiohttp client
267        :return:                        the parsed result
268
269        :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
270        :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx)
271        :raises RepositoryCommunicationError: if the request fails due to network
272        """
273        assert (
274            json is not None or data is not None
275        ), "Either json or data must be provided"
276
277        return await self._retried(
278            "PUT",
279            url,
280            partial(
281                self._get_call_result,
282                result_class=result_class,
283            ),
284            idempotent=True,
285            json=json,
286            data=data,
287            **kwargs,
288        )
289
290    async def put_stream(
291        self,
292        *,
293        url: URL,
294        source: DataSource,
295        open_kwargs: dict[str, Any] | None = None,
296        **kwargs: Any,  # noqa: ANN401
297    ) -> ClientResponse:
298        """Perform a PUT request to the repository with a file.
299
300        :param url:                 the url of the request
301        :param file:                the file to send
302        :param kwargs:              any kwargs to pass to the aiohttp client
303        :return:                    the response (not parsed)
304
305        :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
306        :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx)
307        :raises RepositoryCommunicationError: if the request fails due to network
308        """
309
310        async def _put(response: ClientResponse) -> ClientResponse:
311            return response
312
313        return await self._retried(
314            "PUT",
315            url,
316            _put,
317            idempotent=True,
318            data=partial(source.open, **(open_kwargs or {})),
319            **kwargs
320        )
321
322    async def get_stream(
323        self,
324        *,
325        url: URL,
326        sink: DataSink,
327        offset: int = 0,
328        size: int | None = None,
329        **kwargs: Any,  # noqa: ANN401
330    ) -> None:
331        """Perform a GET request to the repository and write the response to a sink.
332
333        :param url:                 the url of the request
334        :param kwargs:              any kwargs to pass to the aiohttp client
335        :return:                    the parsed result
336
337        :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
338        :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx)
339        :raises RepositoryCommunicationError: if the request fails due to network error
340        """
341        
342        async def _copy_stream(response: ClientResponse) -> None:
343            chunk = await sink.open_chunk(offset=offset)
344            try:
345                async for data in response.content.iter_any():
346                    await chunk.write(data)
347            finally:
348                await chunk.close()
349        
350        if size is not None:
351            range_header = f"bytes={offset}-{offset + size - 1}"
352        else:
353            range_header = f"bytes={offset}-"
354        
355        await self._retried(
356            "GET",
357            url,
358            _copy_stream,
359            idempotent=True,
360            headers={"Range": range_header},
361            **kwargs
362        )
363
364    async def delete(
365        self,
366        *,
367        url: URL,
368        idempotent: bool = False,
369        **kwargs: Any,  # noqa: ANN401
370    ) -> None:
371        """Perform a DELETE request to the repository.
372
373        :param url:                 the url of the request
374        :param idempotent:          True if the request is idempotent, normally should be False
375        :param kwargs:              any kwargs to pass to the aiohttp client
376        :return:                    None
377
378        :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
379        :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx)
380        :raises RepositoryCommunicationError: if the request fails due to network
381        """
382        return await self._retried(
383            "DELETE",
384            url,
385            None,
386            idempotent=idempotent,
387            **kwargs
388        )
389
390    @overload
391    async def _get_call_result[T](
392        self,
393        response: ClientResponse,
394        result_class: type[T],
395    ) -> T: ...
396
397    @overload
398    async def _get_call_result(
399        self,
400        response: ClientResponse,
401        result_class: None,
402    ) -> None: ...
403
404    async def _get_call_result[T](
405        self,
406        response: ClientResponse,
407        result_class: type[T] | None,
408    ) -> T | None:
409        """Get the result from the response.
410
411        :param response:            the aiohttp response
412        :param result_class:        the class to parse the response to
413        :return:                    the parsed result
414
415        :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
416        :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx)
417        :raises RepositoryCommunicationError: if the request fails due to network
418        """
419        await response.raise_for_invenio_status()  # type: ignore
420        if response.status == 204:
421            assert result_class is None
422            return None
423        json_payload = await response.read()
424        assert result_class is not None
425        if issubclass(result_class, ClientResponse):
426            return cast(T, response)    # mypy can not get it
427        elif issubclass(result_class, str):
428            return cast(T, json_payload.decode('utf-8'))    # mypy can not get it
429        elif issubclass(result_class, dict):
430            return _json.loads(json_payload)
431        etag = remove_quotes(response.headers.get("ETag"))
432        return deserialize_rest_response(
433            self, communication_log, json_payload, result_class, etag
434        )
435
436    @overload
437    async def _retried[T](
438        self,
439        method: str,
440        url: URL,
441        callback: Callable[[ClientResponse], Awaitable[T]],
442        idempotent: bool,
443        **kwargs: Any,  # noqa: ANN401
444    ) -> T: ...
445    
446    @overload
447    async def _retried(
448        self,
449        method: str,
450        url: URL,
451        callback: None,
452        idempotent: bool,
453        **kwargs: Any,  # noqa: ANN401
454    ) -> None: ...
455
456    async def _retried[T](
457        self,
458        method: str,
459        url: URL,
460        callback: Callable[[ClientResponse], Awaitable[T]] | None,
461        idempotent: bool,
462        **kwargs: Any,  # noqa: ANN401
463    ) -> T | None:
464        """Log the start of a request and retry it if necessary."""
465        json = kwargs.get("json")
466        if json is not None and callable(json):
467            json = json()
468            kwargs["json"] = json
469
470        data = kwargs.get("data")
471
472        if communication_log.isEnabledFor(logging.INFO):
473            communication_log.info("%s %s", method.upper(), url)
474            if json is not None:
475                communication_log.info("%s", _json.dumps(json))
476            if data is not None:
477                communication_log.info("(stream)")
478
479        for attempt in try_until_success(
480            self._retry_count if idempotent else 1
481        ):
482            actual_data = None
483            if data is not None and callable(data):
484                actual_data = await data()
485                kwargs["data"] = actual_data
486            try:
487                async with (
488                    attempt,
489                    self._limiter,
490                    self._client(idempotent=True) as client,
491                    _cast_error(),
492                    client.request(method, url, auth=self._auth, **kwargs) as response,
493                ):
494                    await response.raise_for_invenio_status()  # type: ignore
495                    if callback is not None:
496                        return await callback(response)
497                    return None
498            finally:
499                if actual_data is not None and hasattr(actual_data, "close"):
500                    await actual_data.close()
501
502        raise Exception("unreachable")
503
504    async def download_file(
505        self, url: URL, sink: DataSink, 
506        parts: int | None =None, 
507        part_size : int | None = None) -> None:
508
509        try:
510            headers = await self.head(url=url)
511        except RepositoryClientError:
512            # The file is not available for HEAD. This is the case for S3 files
513            # where the file is a pre-signed request. We'll try to download the headers
514            # with a GET request with a range header containing only the first byte.
515            headers = await self.head(url=url, use_get=True)
516
517        size = 0
518        location = URL(headers.get('Location', url))
519        
520        if "Content-Length" in headers:
521            size = int(headers["Content-Length"])
522            await sink.allocate(size)
523
524        if (
525            size
526            and size > MINIMAL_DOWNLOAD_PART_SIZE
527            and any(x == "bytes" for x in headers.getall("Accept-Ranges"))
528        ):
529            await self._download_multipart(location, sink, size, parts, part_size)
530        else:
531            await self._download_single(location, sink)
532
533    async def _download_single(self, url: URL, sink: DataSink) -> None:
534        await self.get_stream(url=url, sink=sink, offset=0)
535
536    async def _download_multipart(
537        self, url: URL, sink: DataSink, size: int, parts: int | None = None, part_size: int | None = None
538    ) -> None:
539        adjusted_parts, adjusted_part_size = adjust_download_multipart_params(size, parts, part_size)
540
541        async with asyncio.TaskGroup() as tg:
542            for i in range(adjusted_parts):
543                start = i * adjusted_part_size
544                size = min((i + 1) * adjusted_part_size, size) - start
545                tg.create_task(
546                    self.get_stream(
547                        url=url, sink=sink, offset=start, size=size
548                    )
549                )

Pre-configured asynchronous http connection.

AsyncConnection( *, limiter: Limiter | None = None, tokens: dict[yarl.URL, str] | None = None, verify_tls: bool = True, retry_count: int = 5, retry_after_seconds: int = 1)
103    def __init__(
104        self,
105        *,
106        limiter: Limiter | None = None,
107        tokens: dict[URL, str] | None = None,
108        verify_tls: bool = True,
109        retry_count: int = 5,
110        retry_after_seconds: int = 1,
111    ):
112        """Create a new connection with the given configuration."""
113        self._limiter = limiter or Limiter(10)
114        self._verify_tls = verify_tls
115        self._retry_count = retry_count
116        self._retry_after_seconds = retry_after_seconds
117        
118        _tokens: list[BearerTokenForHost] = [
119            BearerTokenForHost(host_url=url, token=token)
120            for url, token in (tokens or {}).items()
121            if token
122        ]
123        self._auth = BearerAuthentication(_tokens)

Create a new connection with the given configuration.

async def head( self, *, url: yarl.URL, use_get: bool = False, **kwargs: Any) -> multidict._multidict.CIMultiDictProxy[str]:
153    async def head(
154        self,
155        *,
156        url: URL,
157        use_get: bool = False,
158        **kwargs: Any,  # noqa: ANN401
159    ) -> CIMultiDictProxy[str]:
160        """Perform a HEAD request to the repository.
161
162        :param url:                 the url of the request
163        :param idempotent:          True if the request is idempotent, should be for HEAD requests
164        :param kwargs:              any kwargs to pass to the aiohttp client
165        :return:                    None
166
167        :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
168        :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx)
169        :raises RepositoryCommunicationError: if the request fails due to network
170        """
171        async def _head(response: ClientResponse) -> CIMultiDictProxy[str]:
172            return response.headers
173
174        if use_get:
175            return await self._retried("GET", url, _head, idempotent=True,
176                                       headers={"Range": "bytes=0-0"})
177        else:
178            return await self._retried("HEAD", url, _head, idempotent=True)

Perform a HEAD request to the repository.

Parameters
  • url: the url of the request
  • idempotent: True if the request is idempotent, should be for HEAD requests
  • kwargs: any kwargs to pass to the aiohttp client
Returns
                None
Raises
  • RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
  • RepositoryServerError: if the request fails due to server error (HTTP 5xx)
  • RepositoryCommunicationError: if the request fails due to network
async def get(self, *, url: yarl.URL, result_class: type[T], **kwargs: Any) -> T:
180    async def get[T](
181        self,
182        *,
183        url: URL,
184        result_class: type[T],
185        **kwargs: Any,  # noqa: ANN401
186    ) -> T:
187        """Perform a GET request to the repository.
188
189        :param url:                 the url of the request
190        :param idempotent:          True if the request is idempotent, should be for GET requests
191        :param result_class:        successful response will be parsed to this class
192        :param kwargs:              any kwargs to pass to the aiohttp client
193        :return:                    the parsed result
194
195        :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
196        :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx)
197        :raises RepositoryCommunicationError: if the request fails due to network error
198        """
199        return await self._retried(
200            "GET",
201            url,
202            partial(
203                self._get_call_result,
204                result_class=result_class,
205            ),
206            idempotent=True,
207            **kwargs
208        )

Perform a GET request to the repository.

Parameters
  • url: the url of the request
  • idempotent: True if the request is idempotent, should be for GET requests
  • result_class: successful response will be parsed to this class
  • kwargs: any kwargs to pass to the aiohttp client
Returns
                the parsed result
Raises
  • RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
  • RepositoryServerError: if the request fails due to server error (HTTP 5xx)
  • RepositoryCommunicationError: if the request fails due to network error
async def post( self, *, url: yarl.URL, json: dict[str, typing.Any] | list[typing.Any] | None = None, data: bytes | None = None, idempotent: bool = False, result_class: type[T], **kwargs: Any) -> T:
210    async def post[T](
211        self,
212        *,
213        url: URL,
214        json: dict[str, Any] | list[Any] | None = None,
215        data: bytes | None = None,
216        idempotent: bool = False,
217        result_class: type[T],
218        **kwargs: Any,  # noqa: ANN401
219    ) -> T:
220        """Perform a POST request to the repository.
221
222        :param url:                 the url of the request
223        :param json:                the json payload of the request (use exactly one of json or data)
224        :param data:                the data payload of the request
225        :param idempotent:          True if the request is idempotent, normally should be False
226        :param result_class:        successful response will be parsed to this class
227        :param kwargs:              any kwargs to pass to the aiohttp client
228        :return:                    the parsed result
229
230        :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
231        :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx)
232        :raises RepositoryCommunicationError: if the request fails due to network
233        """
234        assert (
235            json is not None or data is not None
236        ), "Either json or data must be provided"
237
238        return await self._retried(
239            "POST",
240            url,
241            partial(
242                self._get_call_result,
243                result_class=result_class,
244            ),
245            idempotent=idempotent,
246            json=json,
247            data=data,
248            **kwargs
249        )

Perform a POST request to the repository.

Parameters
  • url: the url of the request
  • json: the json payload of the request (use exactly one of json or data)
  • data: the data payload of the request
  • idempotent: True if the request is idempotent, normally should be False
  • result_class: successful response will be parsed to this class
  • kwargs: any kwargs to pass to the aiohttp client
Returns
                the parsed result
Raises
  • RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
  • RepositoryServerError: if the request fails due to server error (HTTP 5xx)
  • RepositoryCommunicationError: if the request fails due to network
async def put( self, *, url: yarl.URL, json: dict[str, typing.Any] | list[typing.Any] | None = None, data: bytes | None = None, result_class: type[T], **kwargs: Any) -> T:
251    async def put[T](
252        self,
253        *,
254        url: URL,
255        json: dict[str, Any] | list[Any] | None = None,
256        data: bytes | None = None,
257        result_class: type[T],
258        **kwargs: Any,  # noqa: ANN401
259    ) -> T:
260        """Perform a PUT request to the repository.
261
262        :param url:                     the url of the request
263        :param json:                    the json payload of the request (use exactly one of json or data)
264        :param data:                    the data payload of the request
265        :param result_class:            successful response will be parsed to this class
266        :param kwargs:                  any kwargs to pass to the aiohttp client
267        :return:                        the parsed result
268
269        :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
270        :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx)
271        :raises RepositoryCommunicationError: if the request fails due to network
272        """
273        assert (
274            json is not None or data is not None
275        ), "Either json or data must be provided"
276
277        return await self._retried(
278            "PUT",
279            url,
280            partial(
281                self._get_call_result,
282                result_class=result_class,
283            ),
284            idempotent=True,
285            json=json,
286            data=data,
287            **kwargs,
288        )

Perform a PUT request to the repository.

Parameters
  • url: the url of the request
  • json: the json payload of the request (use exactly one of json or data)
  • data: the data payload of the request
  • result_class: successful response will be parsed to this class
  • kwargs: any kwargs to pass to the aiohttp client
Returns
                    the parsed result
Raises
  • RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
  • RepositoryServerError: if the request fails due to server error (HTTP 5xx)
  • RepositoryCommunicationError: if the request fails due to network
async def put_stream( self, *, url: yarl.URL, source: nrp_cmd.async_client.streams.DataSource, open_kwargs: dict[str, typing.Any] | None = None, **kwargs: Any) -> aiohttp.client_reqrep.ClientResponse:
290    async def put_stream(
291        self,
292        *,
293        url: URL,
294        source: DataSource,
295        open_kwargs: dict[str, Any] | None = None,
296        **kwargs: Any,  # noqa: ANN401
297    ) -> ClientResponse:
298        """Perform a PUT request to the repository with a file.
299
300        :param url:                 the url of the request
301        :param file:                the file to send
302        :param kwargs:              any kwargs to pass to the aiohttp client
303        :return:                    the response (not parsed)
304
305        :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
306        :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx)
307        :raises RepositoryCommunicationError: if the request fails due to network
308        """
309
310        async def _put(response: ClientResponse) -> ClientResponse:
311            return response
312
313        return await self._retried(
314            "PUT",
315            url,
316            _put,
317            idempotent=True,
318            data=partial(source.open, **(open_kwargs or {})),
319            **kwargs
320        )

Perform a PUT request to the repository with a file.

Parameters
  • url: the url of the request
  • file: the file to send
  • kwargs: any kwargs to pass to the aiohttp client
Returns
                the response (not parsed)
Raises
  • RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
  • RepositoryServerError: if the request fails due to server error (HTTP 5xx)
  • RepositoryCommunicationError: if the request fails due to network
async def get_stream( self, *, url: yarl.URL, sink: nrp_cmd.async_client.streams.DataSink, offset: int = 0, size: int | None = None, **kwargs: Any) -> None:
322    async def get_stream(
323        self,
324        *,
325        url: URL,
326        sink: DataSink,
327        offset: int = 0,
328        size: int | None = None,
329        **kwargs: Any,  # noqa: ANN401
330    ) -> None:
331        """Perform a GET request to the repository and write the response to a sink.
332
333        :param url:                 the url of the request
334        :param kwargs:              any kwargs to pass to the aiohttp client
335        :return:                    the parsed result
336
337        :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
338        :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx)
339        :raises RepositoryCommunicationError: if the request fails due to network error
340        """
341        
342        async def _copy_stream(response: ClientResponse) -> None:
343            chunk = await sink.open_chunk(offset=offset)
344            try:
345                async for data in response.content.iter_any():
346                    await chunk.write(data)
347            finally:
348                await chunk.close()
349        
350        if size is not None:
351            range_header = f"bytes={offset}-{offset + size - 1}"
352        else:
353            range_header = f"bytes={offset}-"
354        
355        await self._retried(
356            "GET",
357            url,
358            _copy_stream,
359            idempotent=True,
360            headers={"Range": range_header},
361            **kwargs
362        )

Perform a GET request to the repository and write the response to a sink.

Parameters
  • url: the url of the request
  • kwargs: any kwargs to pass to the aiohttp client
Returns
                the parsed result
Raises
  • RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
  • RepositoryServerError: if the request fails due to server error (HTTP 5xx)
  • RepositoryCommunicationError: if the request fails due to network error
async def delete(self, *, url: yarl.URL, idempotent: bool = False, **kwargs: Any) -> None:
364    async def delete(
365        self,
366        *,
367        url: URL,
368        idempotent: bool = False,
369        **kwargs: Any,  # noqa: ANN401
370    ) -> None:
371        """Perform a DELETE request to the repository.
372
373        :param url:                 the url of the request
374        :param idempotent:          True if the request is idempotent, normally should be False
375        :param kwargs:              any kwargs to pass to the aiohttp client
376        :return:                    None
377
378        :raises RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
379        :raises RepositoryServerError: if the request fails due to server error (HTTP 5xx)
380        :raises RepositoryCommunicationError: if the request fails due to network
381        """
382        return await self._retried(
383            "DELETE",
384            url,
385            None,
386            idempotent=idempotent,
387            **kwargs
388        )

Perform a DELETE request to the repository.

Parameters
  • url: the url of the request
  • idempotent: True if the request is idempotent, normally should be False
  • kwargs: any kwargs to pass to the aiohttp client
Returns
                None
Raises
  • RepositoryClientError: if the request fails due to client passing incorrect parameters (HTTP 4xx)
  • RepositoryServerError: if the request fails due to server error (HTTP 5xx)
  • RepositoryCommunicationError: if the request fails due to network
async def download_file( self, url: yarl.URL, sink: nrp_cmd.async_client.streams.DataSink, parts: int | None = None, part_size: int | None = None) -> None:
504    async def download_file(
505        self, url: URL, sink: DataSink, 
506        parts: int | None =None, 
507        part_size : int | None = None) -> None:
508
509        try:
510            headers = await self.head(url=url)
511        except RepositoryClientError:
512            # The file is not available for HEAD. This is the case for S3 files
513            # where the file is a pre-signed request. We'll try to download the headers
514            # with a GET request with a range header containing only the first byte.
515            headers = await self.head(url=url, use_get=True)
516
517        size = 0
518        location = URL(headers.get('Location', url))
519        
520        if "Content-Length" in headers:
521            size = int(headers["Content-Length"])
522            await sink.allocate(size)
523
524        if (
525            size
526            and size > MINIMAL_DOWNLOAD_PART_SIZE
527            and any(x == "bytes" for x in headers.getall("Accept-Ranges"))
528        ):
529            await self._download_multipart(location, sink, size, parts, part_size)
530        else:
531            await self._download_single(location, sink)
class Limiter(asyncio.locks.Semaphore):
14class Limiter(asyncio.Semaphore):
15    """A class to limit the number of simultaneous connections."""
16
17    def __init__(self, capacity: int):
18        """Initialize the limiter.
19
20        :param capacity:    the number of simultaneous connections
21        """
22        self.capacity = capacity
23        super().__init__(capacity)
24
25    @property
26    def free(self) -> int:
27        """The number of free slots.
28
29        :return:   the number of remaining connections
30        """
31        return self._value

A class to limit the number of simultaneous connections.

Limiter(capacity: int)
17    def __init__(self, capacity: int):
18        """Initialize the limiter.
19
20        :param capacity:    the number of simultaneous connections
21        """
22        self.capacity = capacity
23        super().__init__(capacity)

Initialize the limiter.

Parameters
  • capacity: the number of simultaneous connections
capacity
free: int
25    @property
26    def free(self) -> int:
27        """The number of free slots.
28
29        :return:   the number of remaining connections
30        """
31        return self._value

The number of free slots.

Returns

the number of remaining connections