Metadata-Version: 2.4
Name: ikomia-client
Version: 0.3.0
Summary: Ikomia SCALE client library
Author-email: Ludovic Barusseau <ludovic.barusseau@ikomia.ai>, Loris Nezan <loris.nezan@ikomia.ai>
Classifier: Programming Language :: Python :: 3
Requires-Python: >=3.9
Requires-Dist: fsspec>=2025.3.2
Requires-Dist: httpx[http2]<1.0.0,>=0.27.0
Requires-Dist: numpy
Requires-Dist: pillow<11.0.0,>=10.2.0
Requires-Dist: yarl<2.0.0,>=1.8.1
Description-Content-Type: text/markdown

# ikomia-client

Python library that eases the use of Ikomia SCALE deployment endpoints.

## Installation

```bash
pip install ikclient
```

## Usage

Prerequisites:
 - Ikomia SCALE deployment URL
 - Valid API token

### Basic usage

The following example run on a `canny edge detector` workflow.  
`endpoint_url` is set as it appear on Ikomia SCALE webapp interface.

```python
from ikclient.core.client import Client

# Get client object
client = Client(endpoint_url, token=token)

# Run workflow on image
results = client.run("path/to/image.jpg")

output = results.get_output()
image = output.to_pil()
image.show()
```

### Async version

The client is also available in async mode.

```python
import asyncio
from ikclient.core.client import AsyncClient

async def call_endpoint(url, token=None):
    async with AsyncClient(url, token=token) as client:
        # Get results
        results = await client.run("path/to/image.jpg")

        return results.get_outputs()

asyncio.run(call_endpoint(endpoint_url))
```

## Advanced usage

### Using FSSPEC path

`client.run`, `client.run_on` and `client.run_task` all accept fsspec paths. You can fetch files from remote storage (S3, GCS, etc.) and use them as input for your deployment.

```python
client = Client(endpoint_url, token=token)

client.run("s3://bucket/path/to/image.jpg")
```

You can also configure the your file system backend by using `create_input` and setting the standard `storage_options` parameter:

```python
client = Client(endpoint_url, token=token)

client.run(
    client.create_input(
        "s3://bucket/path/to/image.jpg",
        storage_options={"client_kwargs": {"endpoint_url": "http://127.0.0.1:5555"}}
    )
)
```

### SCALE Storage API

The library also provides a client to interact with the SCALE Storage API which allows you to upload files to your deployment endpoint.

```python
from ikclient.core.client import Client

# Get client object
client = Client(endpoint_url, token=token)

# Upload to storage
with open("path/to/image.jpg", "rb") as f:
    uploaded_image = client.storage.put(f.read(), "image.jpg")

# Copy
copied_image = client.storage.copy(uploaded_image, "image_copy.jpg")

# Download from storage
with client.storage.read(copied_image) as f:
    with open("path/to/image_copy.jpg", "wb") as out:
        out.write(f.content)

# Create a pre-signed URL to download the file
url = client.storage.get_presigned_download_url(uploaded_image)

# Delete
client.storage.delete("image.jpg")
```

### Using SCALE Storage with deployments

You can use `create_storage_input` to create an input for a deployment from a file in the storage.

```python
client = Client(endpoint_url, token=token)

with open("path/to/image.jpg", "rb") as f:
    client.storage.put(f.read(), "image.jpg")

# Run workflow on uploaded image
results = client.run(client.create_storage_input("image.jpg"))
```

### Use Context to call endpoint

When you need to select specific tasks and outputs, or configure parameters, you can use the `Context` object.

```python
import asyncio
import os

from ikclient.core.client import AsyncClient
from ikclient.core.context import Context

async def call_endpoint(url):
    async with AsyncClient(url) as client:
        # Create a call context
        context = await client.build_context()

        # Set parameter for different tasks
        context.set_parameters("ocv_box_filter", {"kSizeHeight": 10, "kSizeWidth": 10})
        context.set_parameters("ocv_canny", {"threshold1": 100, "threshold2": 200})

        # Add wanted output
        context.add_output("ocv_box_filter", index=0)  # Will return first output from task 'ocv_box_filter'
        context.add_output("ocv_canny")  # Add also all outputs from task 'ocv_canny' (hint: there's only one output for this task)

        # Run call with context and get results
        results = await client.run_on(context, image_source_directory / "image.jpg")

        # Print how many items we get in results.
        print(len(results))  # Get '2', one for 'ocv_box_filter', one for 'ocv_canny'

        # Both output in results are images, display them
        for img in results:
            img.to_pil().show()

        # You can reuse previously crafted context to run on a lot of images
        for root, dirs, filenames in os.walk(lot_of_images_directory):
            for filename in filenames:
                results = await client.run_on(context, os.path.join(root, filename))
                for img in results:
                    img.to_pil().show()


asyncio.run(call_endpoint(endpoint_url))
```

### Get progress

When you need to give feedback on deployment call progress
```python
from ikclient.core.client import Client
from ikclient.core.results import Results

def on_progress(run_id: str = None, name: str = None, state: str = None, eta: Tuple[int, int] = None, uuid: str = None, results: Results = None):
    """
    Define a function that will receive progress informations.

    As whole arguments are keywords, you can only specify needed arguments, eg:
        def on_progress(state=None, results=None, **_):
            if state == "COMPLETED":
                results.save_outputs(image_destination_directory)

    Args:
        run_id: A unique id as str to identify run
        name: Workflow name
        state: Run state. Can be 'BLOCKING', 'SENDING', 'PENDING', 'STARTED', 'FAILURE' or 'COMPLETED'
        eta: A tuple of bounds of with estimated time of processing
        uuid: A deployment endpoint run unique id, FYI only
        results: When task is completed, contains results as Results object. None otherwise.
    """
    eta_lower, eta_upper = eta
    print(f"Call {run_id} is {state} and will be ready in less than {eta_upper} ms")


client = Client(endpoint_url)

# Call with on_progress function
results = client.run(image_source_directory / "image.jpg", on_progress=on_progress)
```

### Chain two (or more) calls

When you need to use a deployment call results as input for another deployment
```python
from ikclient.core.client import Client

# Here use context manager on Client for clarity, but can be declared as above exemples
with Client(first_endpoint_url) as client1, Client(second_endpoint_url) as client2:

    # Store results of first call
    intermediate_results = client1.run(image_source_directory / "image.jpg")

    # Use previous results as second endpoint call inputs and display final results
    final_results = client2.run(intermediate_results)
    for image in final_results:
        image.show()
```
