```diff
--- test_files/411-original.txt	2025-03-07 19:06:54
+++ test_files/411-modified.txt	2025-03-07 19:06:54
@@ -2,37 +2,93 @@
 Composio server object collections
 """
 
-import base64
+import difflib
 import json
-import os
 import time
+import traceback
 import typing as t
 import warnings
+from concurrent.futures import Future, ThreadPoolExecutor
+from unittest import mock
 
 import pysher
+import requests
 import typing_extensions as te
-from pydantic import BaseModel, ConfigDict
-from pysher.channel import Channel
+from pydantic import BaseModel, ConfigDict, Field
+from pysher.channel import Channel as PusherChannel
+from pysher.connection import Connection as PusherConnection
 
-from composio.client.base import BaseClient, Collection
-from composio.client.endpoints import v1
-from composio.client.enums import Action, App, Tag, Trigger
-from composio.client.exceptions import ComposioClientError
-from composio.constants import PUSHER_CLUSTER, PUSHER_KEY
+from composio.client.base import Collection
+from composio.client.endpoints import v1, v2
+from composio.client.enums import (
+    Action,
+    ActionType,
+    App,
+    AppType,
+    Tag,
+    TagType,
+    Trigger,
+    TriggerType,
+)
+from composio.constants import DEFAULT_ENTITY_ID, PUSHER_CLUSTER, PUSHER_KEY
+from composio.exceptions import (
+    ErrorFetchingResource,
+    InvalidParams,
+    InvalidTriggerFilters,
+    SDKTimeoutError,
+    TriggerSubscriptionError,
+)
+from composio.utils import help_msg, logging
+from composio.utils.shared import generate_request_id
 
-from .local_handler import LocalToolHandler
 
+if t.TYPE_CHECKING:
+    from composio.client import Composio
+ALL_AUTH_SCHEMES = (
+    "OAUTH2",
+    "OAUTH1",
+    "API_KEY",
+    "BASIC",
+    "BEARER_TOKEN",
+    "BASIC_WITH_JWT",
+    "GOOGLE_SERVICE_ACCOUNT",
+    "GOOGLEADS_AUTH",
+    "NO_AUTH",
+    "COMPOSIO_LINK",
+    "CALCOM_AUTH",
+)
+AUTH_SCHEME_WITH_INITIATE = (
+    "OAUTH2",
+    "OAUTH1",
+    "API_KEY",
+    "BASIC",
+    "BEARER_TOKEN",
+    "BASIC_WITH_JWT",
+    "GOOGLE_SERVICE_ACCOUNT",
+    "GOOGLEADS_AUTH",
+    "COMPOSIO_LINK",
+    "CALCOM_AUTH",
+)
+AuthSchemeType = t.Literal[
+    "OAUTH2",
+    "OAUTH1",
+    "API_KEY",
+    "BASIC",
+    "BEARER_TOKEN",
+    "BASIC_WITH_JWT",
+    "GOOGLE_SERVICE_ACCOUNT",
+    "GOOGLEADS_AUTH",
+    "NO_AUTH",
+    "COMPOSIO_LINK",
+    "CALCOM_AUTH",
+]
 
-def trigger_names_str(
-    trigger_names: t.Union[t.List[str], t.List[Trigger], t.List[t.Union[str, Trigger]]],
+
+def to_trigger_names(
+    triggers: t.Union[t.List[str], t.List[Trigger], t.List[TriggerType]],
 ) -> str:
     """Get trigger names as a string."""
-    return ",".join(
-        [
-            trigger_name.event if isinstance(trigger_name, Trigger) else trigger_name
-            for trigger_name in trigger_names
-        ]
-    )
+    return ",".join([Trigger(trigger).slug for trigger in triggers])
 
 
 class AuthConnectionParamsModel(BaseModel):
@@ -45,6 +101,7 @@
     client_id: t.Optional[str] = None
     token_type: t.Optional[str] = None
     access_token: t.Optional[str] = None
+    refresh_token: t.Optional[str] = None
     client_secret: t.Optional[str] = None
     consumer_id: t.Optional[str] = None
     consumer_secret: t.Optional[str] = None
@@ -62,9 +119,12 @@
     createdAt: str
     updatedAt: str
     appUniqueId: str
+    appName: str
     integrationId: str
     connectionParams: AuthConnectionParamsModel
+
     clientUniqueUserId: t.Optional[str] = None
+    entityId: str = DEFAULT_ENTITY_ID
 
     # Override arbitrary model config.
     model_config: ConfigDict = ConfigDict(  # type: ignore
@@ -82,7 +142,7 @@
 
     def save_user_access_data(
         self,
-        client: BaseClient,
+        client: "Composio",
         field_inputs: t.Dict,
         redirect_url: t.Optional[str] = None,
         entity_id: t.Optional[str] = None,
@@ -104,8 +164,8 @@
 
     def wait_until_active(
         self,
-        client: BaseClient,
-        timeout=60,
+        client: "Composio",
+        timeout: float = 60.0,
     ) -> "ConnectedAccountModel":
         start_time = time.time()
         while time.time() - start_time < timeout:
@@ -116,12 +176,23 @@
                 return connection
             time.sleep(1)
 
-        # TODO: Replace with timeout error.
-        raise ComposioClientError(
+        raise SDKTimeoutError(
             "Connection did not become active within the timeout period."
         )
 
 
+class ConnectionAuthParam(BaseModel):
+    in_: str = Field(alias="in")
+    name: str
+    value: str
+
+
+class ConnectionParams(BaseModel):
+    body: t.Dict
+    base_url: str
+    parameters: t.List[ConnectionAuthParam]
+
+
 class ConnectedAccounts(Collection[ConnectedAccountModel]):
     """Collection of connected accounts."""
 
@@ -129,8 +200,16 @@
     endpoint = v1 / "connectedAccounts"
 
     @t.overload  # type: ignore
-    def get(self, connection_id: t.Optional[str] = None) -> ConnectedAccountModel:
+    def get(self) -> t.List[ConnectedAccountModel]:
         """
+        Get all connected accounts
+
+        :return: List of Connected accounts
+        """
+
+    @t.overload  # type: ignore
+    def get(self, connection_id: str) -> ConnectedAccountModel:
+        """
         Get an account by connection ID
 
         :param connection_id: ID of the connection to filter by
@@ -140,7 +219,7 @@
     @t.overload
     def get(
         self,
-        connection_id: t.Optional[str] = None,
+        *,
         entity_ids: t.Optional[t.Sequence[str]] = None,
         active: bool = False,
     ) -> t.List[ConnectedAccountModel]:
@@ -155,6 +234,7 @@
     def get(
         self,
         connection_id: t.Optional[str] = None,
+        *,
         entity_ids: t.Optional[t.Sequence[str]] = None,
         active: bool = False,
     ) -> t.Union[ConnectedAccountModel, t.List[ConnectedAccountModel]]:
@@ -169,8 +249,11 @@
         """
         entity_ids = entity_ids or ()
         if connection_id is not None and len(entity_ids) > 0:
-            raise ComposioClientError(
-                message="Cannot use both `connection_id` and `entity_ids` parameters as filter"
+            raise InvalidParams(
+                message=(
+                    "Cannot use both `connection_id` and `entity_ids` "
+                    "parameters as filter"
+                )
             )
 
         if connection_id is not None:
@@ -181,16 +264,16 @@
             )
             return self.model(**response.json())
 
-        quries = {}
+        queries = {"pageSize": "99999999"}
         if len(entity_ids) > 0:
-            quries["user_uuid"] = ",".join(entity_ids)
+            queries["user_uuid"] = ",".join(entity_ids)
 
         if active:
-            quries["showActiveOnly"] = "true"
+            queries["showActiveOnly"] = "true"
 
         response = self._raise_if_required(
             self.client.http.get(
-                url=str(self.endpoint(queries=quries)),
+                url=str(self.endpoint(queries=queries)),
             )
         )
         return [self.model(**account) for account in response.json().get("items", [])]
@@ -200,9 +283,10 @@
         integration_id: str,
         entity_id: t.Optional[str] = None,
         params: t.Optional[t.Dict] = None,
+        labels: t.Optional[t.List] = None,
         redirect_url: t.Optional[str] = None,
     ) -> ConnectionRequestModel:
-        """Initiate a new connected accont."""
+        """Initiate a new connected account."""
         response = self._raise_if_required(
             response=self.client.http.post(
                 url=str(self.endpoint),
@@ -210,33 +294,45 @@
                     "integrationId": integration_id,
                     "userUuid": entity_id,
                     "data": params or {},
+                    "labels": labels or [],
                     "redirectUri": redirect_url,
                 },
             )
         )
         return ConnectionRequestModel(**response.json())
 
+    def info(self, connection_id: str) -> ConnectionParams:
+        response = self._raise_if_required(
+            self.client.http.get(
+                url=str(self.endpoint / connection_id / "info"),
+            )
+        )
+        return ConnectionParams(**response.json())
 
+
 class AuthSchemeField(BaseModel):
     """Auth scheme field."""
 
     name: str
-    displayName: str
+    display_name: t.Optional[str] = None
     description: str
-    type: str
 
+    type: str
+    default: t.Optional[str] = None
     required: bool = False
     expected_from_customer: bool = True
 
+    get_current_user_endpoint: t.Optional[str] = None
 
+
 class AppAuthScheme(BaseModel):
     """App authenticatio scheme."""
 
     scheme_name: str
-    auth_mode: str
-    proxy: t.Dict
+    auth_mode: AuthSchemeType
     fields: t.List[AuthSchemeField]
 
+    proxy: t.Optional[t.Dict] = None
     authorization_url: t.Optional[str] = None
     token_url: t.Optional[str] = None
     default_scopes: t.Optional[t.List] = None
@@ -291,7 +387,6 @@
                     )
                 ).json()
             )
-
         return super().get(queries={})
 
 
@@ -303,7 +398,7 @@
     """Trigger payload property data model."""
 
     description: str
-    title: str
+    title: t.Optional[str] = None
     type: t.Optional[str] = None
     anyOf: t.Optional[t.List[TypeModel]] = None
 
@@ -314,7 +409,7 @@
     """Trigger payload data model."""
 
     properties: t.Dict[str, TriggerPayloadPropertyModel]
-    title: str
+    title: t.Optional[str] = None
     type: t.Optional[str] = None
     anyOf: t.Optional[t.List[TypeModel]] = None
 
@@ -326,16 +421,18 @@
 
     description: str
     title: str
-    type: str
+    default: t.Any = None
 
+    type: t.Optional[str] = None
 
+
 class TriggerConfigModel(BaseModel):
     """Trigger config data model."""
 
     properties: t.Dict[str, TriggerConfigPropertyModel]
     title: str
-    type: str
 
+    type: t.Optional[str] = None
     required: t.Optional[t.List[str]] = None
 
 
@@ -389,22 +486,19 @@
     logo: t.Optional[str] = None
 
 
-class ExecutionDetailsModel(BaseModel):
-    """Execution details data model."""
-
-    executed: bool
-
-
 class SuccessExecuteActionResponseModel(BaseModel):
     """Success execute action response data model."""
 
-    execution_details: ExecutionDetailsModel
-    response_data: str
+    successfull: bool
+    data: t.Dict
+    error: t.Optional[str] = None
 
 
-class FileModel(BaseModel):
-    name: str
-    content: bytes
+class FileType(BaseModel):
+    name: str = Field(
+        ..., description="File name, contains extension to indetify the file type"
+    )
+    content: str = Field(..., description="File content in base64")
 
 
 class Connection(BaseModel):
@@ -446,10 +540,10 @@
 class _TriggerEventFilters(te.TypedDict):
     """Trigger event filterset."""
 
-    app_name: te.NotRequired[str]
+    app_name: te.NotRequired[AppType]
     trigger_id: te.NotRequired[str]
     connection_id: te.NotRequired[str]
-    trigger_name: te.NotRequired[str]
+    trigger_name: te.NotRequired[TriggerType]
     entity_id: te.NotRequired[str]
     integration_id: te.NotRequired[str]
 
@@ -457,23 +551,127 @@
 TriggerCallback = t.Callable[[TriggerEventData], None]
 
 
-class TriggerSubscription:
+class TriggerSubscription(logging.WithLogger):
     """Trigger subscription."""
 
-    _channel: Channel
+    _pusher: pysher.Pusher
+    _channel: PusherChannel
+    _connection: PusherConnection
     _alive: bool
 
-    def __init__(self) -> None:
+    def __init__(self, client: "Composio") -> None:
         """Initialize subscription object."""
+        logging.WithLogger.__init__(self)
+        self.client = client
         self._alive = False
         self._chunks: t.Dict[str, t.Dict[int, str]] = {}
         self._callbacks: t.List[t.Tuple[TriggerCallback, _TriggerEventFilters]] = []
 
+    # pylint: disable=too-many-statements
+    def validate_filters(self, filters: _TriggerEventFilters):
+        docs_link_msg = "\nRead more here: https://docs.composio.dev/introduction/intro/quickstart_3"
+        if not isinstance(filters, dict):
+            raise InvalidParams("Expected filters to be a dictionary" + docs_link_msg)
+
+        expected_filters = list(_TriggerEventFilters.__annotations__)
+        for filter, value in filters.items():
+            if filter not in expected_filters:
+                error_msg = f"Unexpected filter {filter!r}"
+                possible_values = difflib.get_close_matches(
+                    filter, expected_filters, n=1
+                )
+                if possible_values:
+                    (possible_value,) = possible_values
+                    error_msg += f" Did you mean {possible_value!r}?"
+                raise InvalidTriggerFilters(error_msg + docs_link_msg)
+
+            # Validate app name
+            if filter == "app_name":
+                if isinstance(value, App):
+                    slug = value.slug
+                elif isinstance(value, str):
+                    slug = value
+                else:
+                    raise InvalidTriggerFilters(
+                        f"Expected 'app_name' to be App or str, found {value!r}"
+                        + docs_link_msg
+                    )
+
+                # Our enums are in uppercase but we accept lowercase ones too.
+                slug = slug.upper()
+
+                # Ensure the app exists
+                app_names = list(App.iter())
+                if slug not in app_names:
+                    error_msg = f"App {slug!r} does not exist."
+                    possible_values = difflib.get_close_matches(slug, app_names, n=1)
+                    if possible_values:
+                        (possible_value,) = possible_values
+                        error_msg += f" Did you mean {possible_value!r}?"
+
+                    raise InvalidTriggerFilters(error_msg + docs_link_msg)
+
+                # Ensure at least one of the app's triggers are enabled on the account.
+                active_triggers = [
+                    trigger.triggerName for trigger in self.client.active_triggers.get()
+                ]
+                apps_for_triggers = {
+                    Trigger(trigger).app.upper() for trigger in active_triggers
+                }
+                if slug not in apps_for_triggers:
+                    error_msg = (
+                        f"App {slug!r} has no triggers enabled on your account.\n"
+                        "Find the possible triggers by running `composio triggers`."
+                    )
+                    raise InvalidTriggerFilters(error_msg + docs_link_msg)
+
+            # Validate trigger name
+            if filter == "trigger_name":
+                if isinstance(value, Trigger):
+                    slug = value.slug
+                elif isinstance(value, str):
+                    slug = value
+                else:
+                    raise InvalidTriggerFilters(
+                        f"Expected 'trigger_name' to be Trigger or str, found {value!r}"
+                        + docs_link_msg
+                    )
+
+                # Our enums are in uppercase but we accept lowercase ones too.
+                slug = slug.upper()
+
+                # Ensure the trigger exists
+                trigger_names = list(Trigger.iter())
+                if slug not in trigger_names:
+                    error_msg = f"Trigger {slug!r} does not exist."
+                    possible_values = difflib.get_close_matches(
+                        slug, trigger_names, n=1
+                    )
+                    if possible_values:
+                        (possible_value,) = possible_values
+                        error_msg += f" Did you mean {possible_value!r}?"
+
+                    raise InvalidTriggerFilters(error_msg + docs_link_msg)
+
+                # Ensure the trigger is added on your account
+                active_triggers = [
+                    trigger.triggerName for trigger in self.client.active_triggers.get()
+                ]
+                if slug not in active_triggers:
+                    error_msg = (
+                        f"Trigger {slug!r} is not enabled on your account.\nEnable"
+                        f" the trigger by doing `composio triggers enable {slug}`."
+                    )
+                    raise InvalidTriggerFilters(error_msg + docs_link_msg)
+
     def callback(
         self,
         filters: t.Optional[_TriggerEventFilters] = None,
     ) -> t.Callable[[TriggerCallback], TriggerCallback]:
         """Register a trigger callaback."""
+        # Ensure filters is the right type before we stuff it in the callbacks
+        if filters is not None:
+            self.validate_filters(filters)
 
         def _wrap(f: TriggerCallback) -> TriggerCallback:
             self._callbacks.append((f, filters or {}))
@@ -481,27 +679,12 @@
 
         return _wrap
 
-    def _validate_filter(
-        self,
-        check: t.Any,
-        name: str,
-        filters: _TriggerEventFilters,
-    ) -> None:
-        """Check if filter is provided and raise if the values does not match."""
-        value = filters.get(name)
-        if value is None:
-            return
-        if value != check:
-            raise ValueError(
-                f"Skipping since `{name}` filter does not match the event",
-            )
-
     def _handle_callback(
         self,
         callback: TriggerCallback,
         data: TriggerEventData,
         filters: _TriggerEventFilters,
-    ) -> None:
+    ) -> t.Any:
         """Handle callback."""
         for name, check in (
             ("app_name", data.appName),
@@ -511,28 +694,57 @@
             ("entity_id", data.metadata.connection.clientUniqueUserId),
             ("integration_id", data.metadata.connection.integrationId),
         ):
-            self._validate_filter(
-                check=check,
-                name=name,
-                filters=filters,
+            value = filters.get(name)
+            if value is None or str(value).lower() == check.lower():
+                continue
+
+            self.logger.debug(
+                f"Skipping `{callback.__name__}` since "
+                f"`{name}` filter does not match the event metadata",
             )
-        callback(data)
+            return None
 
-    def handle_event(self, event: str) -> None:
-        """Filter events and call the callback function."""
         try:
-            data = TriggerEventData(**json.loads(event))
-        except Exception as e:
-            print(f"Error decoding payload: {e}")
+            return callback(data)
+        except BaseException:
+            self.logger.info(
+                f"Error executing `{callback.__name__}` for "
+                f"event `{data.metadata.triggerName}` "
+                f"with error:\n {traceback.format_exc()}"
+            )
+            return None
+
+    def _parse_payload(self, event: str) -> t.Optional[TriggerEventData]:
+        """Parse event payload."""
         try:
+            return TriggerEventData(**json.loads(event))
+        except Exception as e:
+            self.logger.warning(f"Error decoding payload: {e}")
+            return None
+
+    def handle_event(self, event: str) -> None:
+        """Filter events and call the callback function."""
+        data = self._parse_payload(event=event)
+        if data is None:
+            self.logger.error(f"Error parsing trigger payload: {event}")
+            return
+
+        self.logger.debug(
+            f"Received trigger event with trigger ID: {data.metadata.id} "
+            f"and trigger name: {data.metadata.triggerName}"
+        )
+        awaitables: t.List[Future] = []
+        with ThreadPoolExecutor() as executor:
             for callback, filters in self._callbacks:
-                self._handle_callback(
-                    callback=callback,
-                    data=data,
-                    filters=filters,
+                awaitables.append(
+                    executor.submit(
+                        self._handle_callback,
+                        callback,
+                        data,
+                        filters,
+                    )
                 )
-        except BaseException as e:
-            print(f"Erorr handling event `{data.metadata.id}`: {e}")
+        _ = [future.result() for future in awaitables]
 
     def handle_chunked_events(self, event: str) -> None:
         """Handle chunked events."""
@@ -551,25 +763,46 @@
         """Check if subscription is live."""
         return self._alive
 
+    def has_errored(self) -> bool:
+        """Check if the connection errored and disconnected."""
+        return self._connection.socket is None or self._connection.socket.has_errored
+
     def set_alive(self) -> None:
         """Set `_alive` to True."""
         self._alive = True
 
+    @te.deprecated("Use `wait_forever` instead")
     def listen(self) -> None:
         """Wait infinitely."""
-        while True:
+        self.wait_forever()
+
+    def wait_forever(self) -> None:
+        """Wait infinitely."""
+        while self.is_alive() and not self.has_errored():
             time.sleep(1)
 
+    def stop(self) -> None:
+        """Stop the trigger listener."""
+        self._connection.disconnect()
+        self._alive = False
 
-class _PusherClient:
+    def restart(self) -> None:
+        """Restart the subscription connection"""
+        self._connection.disconnect()
+        self._connection._connect()  # pylint: disable=protected-access
+
+
+class _PusherClient(logging.WithLogger):
     """Pusher client for Composio SDK."""
 
-    def __init__(self, client_id: str, base_url: str, api_key: str) -> None:
+    def __init__(self, client_id: str, client: "Composio") -> None:
         """Initialize pusher client."""
+        super().__init__()
         self.client_id = client_id
-        self.base_url = base_url
-        self.api_key = api_key
-        self.subscription = TriggerSubscription()
+        self.client = client
+        self.api_key = self.client.api_key
+        self.base_url = self.client.http.base_url
+        self.subscription = TriggerSubscription(client=self.client)
 
     def _get_connection_handler(
         self,
@@ -579,7 +812,7 @@
     ) -> t.Callable[[str], None]:
         def _connection_handler(_: str) -> None:
             channel = t.cast(
-                Channel,
+                PusherChannel,
                 pusher.subscribe(
                     channel_name=f"private-{client_id}_triggers",
                 ),
@@ -593,19 +826,48 @@
                 callback=subscription.handle_chunked_events,
             )
             subscription.set_alive()
+            subscription._channel = channel  # pylint: disable=protected-access
+            subscription._connection = (  # pylint: disable=protected-access
+                channel.connection
+            )
 
         return _connection_handler
 
     def connect(self, timeout: float = 15.0) -> TriggerSubscription:
         """Connect to Pusher channel for given client ID."""
+        # Make a request to the Pusher webhook endpoint
+        try:
+            response = requests.post(
+                url=f"{self.base_url}/v1/triggers/pusher",
+                json={
+                    "time": int(time.time() * 1000),  # Current time in milliseconds
+                    "events": [
+                        {
+                            "name": "channel_occupied",
+                            "channel": f"private-{self.client_id}_triggers",
+                        }
+                    ],
+                },
+                timeout=timeout,
+                headers={"Content-Type": "application/json"},
+            )
+            response.raise_for_status()
+        except requests.RequestException as e:
+            self.logger.error(f"Failed to send Pusher webhook: {e}")
+
         pusher = pysher.Pusher(
             key=PUSHER_KEY,
             cluster=PUSHER_CLUSTER,
             auth_endpoint=f"{self.base_url}/v1/client/auth/pusher_auth?fromPython=true",
             auth_endpoint_headers={
                 "x-api-key": self.api_key,
+                "x-request-id": generate_request_id(),
             },
+            auto_sub=True,
         )
+
+        # Patch pusher logger
+        pusher.connection.logger = mock.MagicMock()  # type: ignore
         pusher.connection.bind(
             "pusher:connection_established",
             self._get_connection_handler(
@@ -620,9 +882,11 @@
         deadline = time.time() + timeout
         while time.time() < deadline:
             if self.subscription.is_alive():
+                self.subscription._pusher = pusher  # pylint: disable=protected-access
                 return self.subscription
             time.sleep(0.5)
-        raise TimeoutError(
+
+        raise SDKTimeoutError(
             "Timed out while waiting for trigger listener to be established"
         )
 
@@ -634,7 +898,7 @@
     endpoint = v1.triggers
     callbacks: CallbackCollection
 
-    def __init__(self, client: BaseClient) -> None:
+    def __init__(self, client: "Composio") -> None:
         """Initialize triggers collections."""
         super().__init__(client)
         self.callbacks = CallbackCollection(
@@ -643,10 +907,8 @@
 
     def get(  # type: ignore
         self,
-        trigger_names: t.Optional[
-            t.Union[t.List[str], t.List[Trigger], t.List[t.Union[str, Trigger]]]
-        ] = None,
-        app_names: t.Optional[t.List[str]] = None,
+        trigger_names: t.Optional[t.List[TriggerType]] = None,
+        apps: t.Optional[t.List[str]] = None,
     ) -> t.List[TriggerModel]:
         """
         List active triggers
@@ -657,9 +919,9 @@
         """
         queries = {}
         if trigger_names is not None and len(trigger_names) > 0:
-            queries["triggerIds"] = trigger_names_str(trigger_names)
-        if app_names is not None and len(app_names) > 0:
-            queries["appNames"] = ",".join(app_names)
+            queries["triggerIds"] = to_trigger_names(trigger_names)
+        if apps is not None and len(apps) > 0:
+            queries["appNames"] = ",".join(apps)
         return super().get(queries=queries)
 
     def enable(
@@ -696,8 +958,20 @@
         )
         return response.json()
 
+    def delete(self, id: str) -> t.Dict:
+        """
+        Delete a trigger
+
+        :param id: ID of the trigger to be deleted
+        """
+        response = self._raise_if_required(
+            self.client.http.delete(url=str(self.endpoint / "instance" / id))
+        )
+        return response.json()
+
     def subscribe(self, timeout: float = 15.0) -> TriggerSubscription:
         """Subscribe to a trigger and receive trigger events."""
+        self.logger.debug("Creating trigger subscription")
         response = self._raise_if_required(
             response=self.client.http.get(
                 url="/v1/client/auth/client_info",
@@ -705,12 +979,11 @@
         )
         client_id = response.json().get("client", {}).get("id")
         if client_id is None:
-            raise ComposioClientError("Error fetching client ID")
+            raise TriggerSubscriptionError("Error fetching client ID")
 
         pusher = _PusherClient(
             client_id=client_id,
-            base_url=self.client.http.base_url,
-            api_key=self.client.api_key,
+            client=self.client,
         )
         return pusher.connect(
             timeout=timeout,
@@ -754,99 +1027,131 @@
         if len(integration_ids) > 0:
             queries["integrationIds"] = ",".join(integration_ids)
         if len(trigger_names) > 0:
-            queries["triggerNames"] = trigger_names_str(trigger_names)
-        return self._raise_if_empty(super().get(queries=queries))
+            queries["triggerNames"] = to_trigger_names(trigger_names)
+        return super().get(queries=queries)
 
 
-class ActionParameterPropertyModel(BaseModel):
-    """Action parameter data model."""
-
-    examples: t.Optional[t.List] = None
-    description: t.Optional[str] = None
-    title: t.Optional[str] = None
-    type: t.Optional[str] = None
-    oneOf: t.Optional[t.List["ActionParameterPropertyModel"]] = None
-    file_readable: t.Optional[bool] = False
-
-
-class ActionParametersModel(BaseModel):
-    """Action parameter data models."""
-
-    properties: t.Dict[str, ActionParameterPropertyModel]
+class OpenAPISchema(BaseModel):
+    properties: t.Dict[str, t.Any]
     title: str
     type: str
-
     required: t.Optional[t.List[str]] = None
+    examples: t.Optional[t.List[t.Any]] = None
 
 
-class ActionResponsePropertyModel(BaseModel):
-    """Action response data model."""
+class ActionParametersModel(OpenAPISchema):
+    """Action parameter data models."""
 
-    description: t.Optional[str] = None
-    examples: t.Optional[t.List] = None
-    title: t.Optional[str] = None
-    type: t.Optional[str] = None
 
-
-class ActionResponseModel(BaseModel):
+class ActionResponseModel(OpenAPISchema):
     """Action response data model."""
 
-    properties: t.Dict[str, ActionResponsePropertyModel]
-    title: str
-    type: str
 
-    required: t.Optional[t.List[str]] = None
-
-
 class ActionModel(BaseModel):
     """Action data model."""
 
     name: str
-    display_name: str
-    description: t.Optional[str]
+    description: str
     parameters: ActionParametersModel
     response: ActionResponseModel
-    appKey: str
-    appId: str
-    tags: t.List[str]
     appName: str
-    enabled: bool
+    appId: str
+    version: str
+    available_versions: t.List[str]
 
+    tags: t.List[str]
     logo: t.Optional[str] = None
 
+    display_name: t.Optional[str] = None
+    enabled: bool = False
 
+
+ParamPlacement = t.Literal["header", "path", "query", "subdomain", "metadata"]
+
+
+class CustomAuthParameter(te.TypedDict):
+    in_: ParamPlacement
+    name: str
+    value: str
+
+
+class CustomAuthObject(BaseModel):
+    body: t.Dict = Field(default_factory=lambda: {})
+    base_url: t.Optional[str] = None
+    parameters: t.List[CustomAuthParameter] = Field(default_factory=lambda: [])
+
+
+class SearchResultTask(BaseModel):
+
+    app: str = Field(
+        description="Name of the app required to perform the subtask.",
+    )
+    actions: list[str] = Field(
+        description=(
+            "List of possible actions in order of relevance that can be used to "
+            "perform the task, provide minimum of {-min_actions-} and maximum of "
+            "{-max_actions-} actions."
+        ),
+    )
+    description: str = Field(
+        description="Descrption of the subtask.",
+    )
+    order: int = Field(
+        description="Order of the subtask, SHOULD START FROM 0",
+    )
+
+
+class CreateUploadURLResponse(BaseModel):
+    id: str = Field(..., description="ID of the file")
+    url: str = Field(..., description="Onetime upload URL")
+    key: str = Field(..., description="S3 upload location")
+    exists: bool = Field(False, description="If the file already exists on S3")
+
+
 class Actions(Collection[ActionModel]):
     """Collection of composio actions.."""
 
     model = ActionModel
-    endpoint = v1.actions
-    local_handler = LocalToolHandler()
+    endpoint = v2.actions
 
-    # TODO: Overload
-    def get(  # type: ignore
+    def _get_action(self, action: ActionType) -> ActionModel:
+        return self.model(
+            **self._raise_if_required(
+                response=self.client.http.get(
+                    url=str(self.endpoint / str(action)),
+                    params={
+                        "version": Action(action).version,
+                    },
+                )
+            ).json()
+        )
+
+    def _get_actions(
         self,
-        actions: t.Optional[t.Sequence[Action]] = None,
-        apps: t.Optional[t.Sequence[App]] = None,
-        tags: t.Optional[t.Sequence[t.Union[str, Tag]]] = None,
+        actions: t.Optional[t.Sequence[ActionType]] = None,
+        apps: t.Optional[t.Sequence[AppType]] = None,
+        tags: t.Optional[t.Sequence[TagType]] = None,
         limit: t.Optional[int] = None,
         use_case: t.Optional[str] = None,
         allow_all: bool = False,
     ) -> t.List[ActionModel]:
-        """
-        Get a list of apps by the specified filters.
 
-        :param actions: Filter by the list of Actions.
-        :param apps: Filter by the list of Apps.
-        :param tags: Filter by the list of given Tags.
-        :param limit: Limit the numnber of actions to a specific number.
-        :param use_case: Filter by use case.
-        :param allow_all: Allow querying all of the actions for a specific
-                        app
-        :return: List of actions
-        """
-        actions = actions or []
-        apps = apps or []
-        tags = tags or []
+        def is_action(obj):
+            try:
+                return hasattr(obj, "app")
+            except AttributeError:
+                return False
+
+        actions = t.cast(
+            t.List[Action],
+            [
+                action if is_action(action) else Action(action)
+                for action in actions or []
+            ],
+        )
+        apps = t.cast(t.List[App], [App(app) for app in apps or []])
+        tags = t.cast(t.List[Tag], [Tag(tag) for tag in tags or []])
+
         # Filter out local apps and actions
         local_apps = [app for app in apps if app.is_local]
         local_actions = [action for action in actions if action.is_local]
@@ -858,29 +1163,30 @@
             and (len(local_apps) > 0 or len(local_actions) > 0)
         )
         if only_local_apps:
-            local_items = self.local_handler.get_list_of_action_schemas(
-                apps=local_apps, actions=local_actions, tags=tags
+            local_items = self.client.local.get_action_schemas(
+                apps=local_apps,
+                actions=local_actions,
+                tags=tags,
             )
             return [self.model(**item) for item in local_items]
 
         if len(actions) > 0 and len(apps) > 0:
-            raise ComposioClientError(
+            raise ErrorFetchingResource(
                 "Error retrieving Actions, Both actions and apps "
                 "cannot be used as filters at the same time."
             )
 
         if len(actions) > 0 and len(tags) > 0:
-            raise ComposioClientError(
+            raise ErrorFetchingResource(
                 "Error retrieving Actions, Both actions and tags "
                 "cannot be used as filters at the same time."
             )
 
         if len(apps) > 0 and len(tags) == 0 and not allow_all:
             warnings.warn(
-                "Using all the actions of an app is not recommended. "
-                "Please use tags to filter actions or provide specific actions. "
-                "We just pass the important actions to the agent, but this is not meant "
-                "to be used in production. Check out https://docs.composio.dev/sdk/python/actions for more information.",
+                "Using all actions of an app is not recommended for production."
+                "Learn more: https://docs.composio.dev/patterns/tools/use-tools/use-specific-actions\n\n"
+                + help_msg(),
                 UserWarning,
             )
             tags = ["important"]
@@ -902,31 +1208,34 @@
 
         queries: t.Dict[str, str] = {}
         if use_case is not None and use_case != "":
-            if len(apps) != 1:
-                raise ComposioClientError(
-                    "Error retrieving Actions, Use case "
-                    "should be provided with exactly one app."
-                )
             queries["useCase"] = use_case
 
         if len(apps) > 0:
-            queries["appNames"] = ",".join(list(map(lambda x: x.value, apps)))
+            queries["apps"] = ",".join(list(map(lambda x: t.cast(App, x).slug, apps)))
 
         if len(actions) > 0:
-            queries["appNames"] = ",".join(set(map(lambda x: x.app, actions)))
+            queries["apps"] = ",".join(
+                set(map(lambda x: t.cast(Action, x).app, actions))
+            )
 
         if limit is not None:
             queries["limit"] = str(limit)
+
         response = self._raise_if_required(
             response=self.client.http.get(
-                url=str(self.endpoint(queries=queries)),
+                url=str(
+                    self.endpoint(
+                        queries=queries,
+                    )
+                )
             )
         )
+
         response_json = response.json()
         items = [self.model(**action) for action in response_json.get("items")]
         if len(actions) > 0:
-            required_triggers = [action.action for action in actions]
-            items = [item for item in items if item.name in required_triggers]
+            required = [t.cast(Action, action).slug for action in actions]
+            items = [item for item in items if item.name in required]
 
         if len(tags) > 0:
             required_tags = [tag.app if isinstance(tag, Tag) else tag for tag in tags]
@@ -942,19 +1251,93 @@
                     items = filtered_items
 
         if len(local_apps) > 0 or len(local_actions) > 0:
-            local_items = self.local_handler.get_list_of_action_schemas(
+            local_items = self.client.local.get_action_schemas(
                 apps=local_apps, actions=local_actions, tags=tags
             )
             items = [self.model(**item) for item in local_items] + items
         return items
 
+    @t.overload  # type: ignore
+    def get(self) -> t.List[ActionModel]: ...
+
+    @t.overload  # type: ignore
+    def get(self, action: t.Optional[ActionType] = None) -> ActionModel: ...
+
+    @t.overload  # type: ignore
+    def get(
+        self,
+        actions: t.Optional[t.Sequence[ActionType]] = None,
+        apps: t.Optional[t.Sequence[AppType]] = None,
+        tags: t.Optional[t.Sequence[TagType]] = None,
+        limit: t.Optional[int] = None,
+        use_case: t.Optional[str] = None,
+        allow_all: bool = False,
+    ) -> t.List[ActionModel]: ...
+
+    def get(  # type: ignore
+        self,
+        action: t.Optional[ActionType] = None,
+        *,
+        actions: t.Optional[t.Sequence[ActionType]] = None,
+        apps: t.Optional[t.Sequence[AppType]] = None,
+        tags: t.Optional[t.Sequence[TagType]] = None,
+        limit: t.Optional[int] = None,
+        use_case: t.Optional[str] = None,
+        allow_all: bool = False,
+    ) -> t.Union[ActionModel, t.List[ActionModel]]:
+        """
+        Get a list of apps by the specified filters.
+
+        :param actions: Filter by the list of Actions.
+        :param action: Get data for this action.
+        :param apps: Filter by the list of Apps.
+        :param tags: Filter by the list of given Tags.
+        :param limit: Limit the number of actions to a specific number.
+        :param use_case: Filter by use case.
+        :param allow_all: Allow querying all of the actions for a specific
+                        app
+        :return: List of actions
+        """
+        if action is not None:
+            return self._get_action(action=action)
+
+        return self._get_actions(
+            actions=actions,
+            apps=apps,
+            tags=tags,
+            limit=limit,
+            use_case=use_case,
+            allow_all=allow_all,
+        )
+
+    @staticmethod
+    def _serialize_auth(auth: t.Optional[CustomAuthObject]) -> t.Optional[t.Dict]:
+        if auth is None:
+            return None
+
+        data = auth.model_dump(exclude_none=True)
+        data["parameters"] = [
+            {"in": d["in_"], "name": d["name"], "value": d["value"]}
+            for d in data["parameters"]
+        ]
+        for param in data["parameters"]:
+            if param["in"] == "metadata":
+                raise InvalidParams(
+                    "Param placement cannot be 'metadata' for remote "
+                    f"action execution: {param}"
+                )
+        return data
+
     def execute(
         self,
         action: Action,
         params: t.Dict,
-        entity_id: str,
+        entity_id: str = "default",
         connected_account: t.Optional[str] = None,
+        session_id: t.Optional[str] = None,
         text: t.Optional[str] = None,
+        auth: t.Optional[CustomAuthObject] = None,
+        allow_tracing: bool = False,
     ) -> t.Dict:
         """
         Execute an action on the specified entity with optional connected account.
@@ -963,69 +1346,149 @@
         :param params: A dictionary of parameters to be passed to the action.
         :param entity_id: The unique identifier of the entity on which the action is executed.
         :param connected_account: Optional connected account ID if required for the action.
+        :param session_id: ID of the current workspace session
         :return: A dictionary containing the response from the executed action.
         """
-        if action.is_local:
-            return self.local_handler.execute_local_action(
-                action=action,
-                request_data=params,
-            )
-        actions = self.get(
-            actions=[action],
-        )
-        if len(actions) == 0:
-            raise ComposioClientError(f"Action {action} not found")
-
-        (action_model,) = actions
-        action_req_schema = action_model.parameters.properties
-        modified_params = {}
-        for param, value in params.items():
-            file_readable = action_req_schema[param].file_readable or False
-            if file_readable and isinstance(value, str) and os.path.isfile(value):
-                with open(value, "rb") as file:
-                    file_content = file.read()
-                    try:
-                        modified_params[param] = file_content.decode("utf-8")
-                    except UnicodeDecodeError:
-                        # If decoding fails, treat as binary and encode in base64
-                        modified_params[param] = base64.b64encode(file_content).decode(
-                            "utf-8"
-                        )
-            else:
-                modified_params[param] = value
-
         if action.no_auth:
             return self._raise_if_required(
-                self.client.http.post(
-                    url=str(self.endpoint / action.action / "execute"),
+                self.client.long_timeout_http.post(
+                    url=str(self.endpoint / action.slug / "execute"),
                     json={
                         "appName": action.app,
-                        "input": modified_params,
-                        "entityId": entity_id,
+                        "input": params,
                         "text": text,
+                        "version": action.version,
+                        "sessionInfo": {
+                            "sessionId": session_id,
+                        },
+                        "allowTracing": allow_tracing,
                     },
                 )
             ).json()
 
-        if connected_account is None:
-            raise ComposioClientError(
+        if connected_account is None and auth is None:
+            raise InvalidParams(
                 "`connected_account` cannot be `None` when executing "
                 "an app which requires authentication"
             )
 
         return self._raise_if_required(
-            self.client.http.post(
-                url=str(self.endpoint / action.action / "execute"),
+            self.client.long_timeout_http.post(
+                url=str(self.endpoint / action.slug / "execute"),
                 json={
                     "connectedAccountId": connected_account,
-                    "input": modified_params,
                     "entityId": entity_id,
+                    "appName": action.app,
+                    "input": params,
                     "text": text,
+                    "version": action.version,
+                    "authConfig": self._serialize_auth(auth=auth),
+                    "sessionInfo": {
+                        "sessionId": session_id,
+                    },
+                    "allowTracing": allow_tracing,
                 },
             )
         ).json()
 
+    def request(
+        self,
+        connection_id: str,
+        endpoint: str,
+        method: str,
+        body: t.Optional[t.Dict] = None,
+        parameters: t.Optional[t.List[CustomAuthParameter]] = None,
+    ) -> t.Dict:
+        return self.client.http.post(
+            url=str(self.endpoint / "proxy"),
+            json={
+                "connectedAccountId": connection_id,
+                "body": body,
+                "method": method.upper(),
+                "endpoint": endpoint,
+                "parameters": [
+                    {
+                        "in": param["in_"],
+                        "name": param["name"],
+                        "value": param["value"],
+                    }
+                    for param in parameters or []
+                ],
+            },
+        ).json()
 
+    def search_for_a_task(
+        self,
+        use_case: str,
+        limit: t.Optional[int] = None,
+        min_actions_per_task: t.Optional[int] = None,
+        max_actions_per_task: t.Optional[int] = None,
+        apps: t.Optional[t.List[str]] = None,
+    ) -> t.List[SearchResultTask]:
+        params: t.Dict[str, t.Any] = {"useCase": use_case}
+        if limit is not None:
+            params["limit"] = limit
+
+        if min_actions_per_task is not None:
+            params["minActionsPerTask"] = min_actions_per_task
+
+        if max_actions_per_task is not None:
+            params["maxActionsPerTask"] = max_actions_per_task
+
+        if apps is not None:
+            params["apps"] = ",".join(apps)
+
+        response = self._raise_if_required(
+            response=self.client.http.get(
+                str(self.endpoint / "search" / "advanced"),
+                params=params,
+            )
+        )
+
+        return [
+            SearchResultTask.model_validate(task)
+            for task in response.json().get("items", [])
+        ]
+
+    def create_file_upload(
+        self,
+        app: str,
+        action: str,
+        filename: str,
+        mimetype: str,
+        md5: str,
+    ) -> CreateUploadURLResponse:
+        return CreateUploadURLResponse(
+            **self._raise_if_required(
+                response=self.client.http.post(
+                    url=str(self.endpoint / "files" / "upload" / "request"),
+                    json={
+                        "md5": md5,
+                        "app": app,
+                        "action": action,
+                        "filename": filename,
+                        "mimetype": mimetype,
+                    },
+                )
+            ).json()
+        )
+
+
+class ExpectedFieldInput(BaseModel):
+    name: str
+    type: str
+
+    description: str
+    displayName: str
+    is_secret: bool = False
+
+    required: bool = True
+    expected_from_customer: bool = True
+
+    default: t.Optional[str] = None
+    get_current_user_endpoint: t.Optional[str] = None
+
+
 class IntegrationModel(BaseModel):
     """Integration data model."""
 
@@ -1037,9 +1500,11 @@
     enabled: bool
     deleted: bool
     appId: str
-    _count: t.Dict
     appName: str
+    expectedInputFields: t.List[ExpectedFieldInput] = Field(default_factory=lambda: [])
 
+    _count: t.Dict
+
     logo: t.Optional[str] = None
     defaultConnectorId: t.Optional[str] = None
     connections: t.Optional[t.List[t.Dict]] = None
@@ -1057,9 +1522,10 @@
         self,
         app_id: str,
         name: t.Optional[str] = None,
-        auth_mode: t.Optional[str] = None,
+        auth_mode: t.Optional["AuthSchemeType"] = None,
         auth_config: t.Optional[t.Dict[str, t.Any]] = None,
         use_composio_auth: bool = False,
+        force_new_integration: bool = False,
     ) -> IntegrationModel:
         """
         Create a new integration
@@ -1081,8 +1547,13 @@
 
         if auth_mode is not None:
             request["authScheme"] = auth_mode
+
+        if auth_config is not None:
             request["authConfig"] = auth_config or {}
 
+        if force_new_integration:
+            request["forceNewIntegration"] = force_new_integration
+
         response = self._raise_if_required(
             response=self.client.http.post(
                 url=str(self.endpoint),
@@ -1090,3 +1561,91 @@
             )
         )
         return IntegrationModel(**response.json())
+
+    def remove(self, id: str) -> None:
+        self.client.http.delete(url=str(self.endpoint / id))
+
+    @t.overload  # type: ignore
+    def get(
+        self,
+        *,
+        page_size: t.Optional[int] = None,
+        page: t.Optional[int] = None,
+        app_id: t.Optional[str] = None,
+        app_name: t.Optional[str] = None,
+        show_disabled: t.Optional[bool] = None,
+    ) -> t.List[IntegrationModel]: ...
+
+    @t.overload
+    def get(self, id: t.Optional[str] = None) -> IntegrationModel: ...
+
+    def get(
+        self,
+        id: t.Optional[str] = None,
+        *,
+        page_size: t.Optional[int] = None,
+        page: t.Optional[int] = None,
+        app_id: t.Optional[str] = None,
+        app_name: t.Optional[str] = None,
+        show_disabled: t.Optional[bool] = None,
+    ) -> t.Union[t.List[IntegrationModel], IntegrationModel]:
+        if id is not None:
+            return IntegrationModel(
+                **self._raise_if_required(
+                    self.client.http.get(url=str(self.endpoint / id))
+                ).json()
+            )
+        quries = {}
+        if page_size is not None:
+            quries["pageSize"] = json.dumps(page_size)
+
+        if page is not None:
+            quries["page"] = json.dumps(page)
+
+        if app_id is not None:
+            quries["appId"] = app_id
+
+        if app_name is not None:
+            quries["appName"] = app_name
+
+        if show_disabled is not None:
+            quries["showDisabled"] = json.dumps(show_disabled)
+
+        return super().get(queries=quries)
+
+    @te.deprecated("`get_id` is deprecated, use `get(id=id)`")
+    def get_by_id(
+        self,
+        integration_id: str,
+    ) -> IntegrationModel:
+        """
+        Get an integration by its ID.
+
+        :param integration_id: Integration ID string.
+        :return: Integration model.
+        """
+        response = self._raise_if_required(
+            self.client.http.get(url=str(self.endpoint / integration_id))
+        )
+        return IntegrationModel(**response.json())
+
+
+class LogRecord(BaseModel):
+    pass
+
+
+class Logs(Collection[LogRecord]):
+    """
+    Logs endpoint.
+    """
+
+    model = LogRecord
+    endpoint = v1.logs
+
+    def push(self, record: t.Dict) -> None:
+        """Push logs to composio."""
+        # TODO: handle this better
+        if self.client._api_key is None:  # pylint: disable=protected-access
+            return
+
+        self.client.http.post(url=str(self.endpoint), json=record)
```
