"""
Weex Sync Client - Synchronous wrapper for async Weex client.
Provides a blocking interface for users who prefer synchronous code.
"""
from __future__ import annotations
import asyncio
import threading
from typing import Any, Self
import structlog
from .client import ResponseData, WeexAsyncClient
from .config import WeexConfig
from .models import PlaceOrderRequest
from .types import ClientOrderId, OrderId, Symbol, Timeout
logger = structlog.get_logger()
logger = structlog.get_logger()
[docs]
class WeexSyncClient:
"""
Synchronous wrapper for WeexAsyncClient.
Provides a blocking interface while maintaining the benefits
of the async implementation internally.
"""
[docs]
def __init__(
self,
config: WeexConfig,
*,
timeout: Timeout = 30.0,
max_retries: int = 3,
backoff_base: float = 0.5,
backoff_max: float = 8.0,
) -> None:
"""
Initialize synchronous client wrapper.
Args:
config: WeexConfig instance
timeout: Request timeout in seconds
max_retries: Maximum retry attempts
backoff_base: Base backoff delay for retries
backoff_max: Maximum backoff delay
"""
self.config = config
self.timeout = timeout
self.max_retries = max_retries
self.backoff_base = backoff_base
self.backoff_max = backoff_max
# Create async client
self._async_client = WeexAsyncClient(
config=config,
timeout=timeout,
max_retries=max_retries,
backoff_base=backoff_base,
backoff_max=backoff_max,
)
self._loop: asyncio.AbstractEventLoop | None = None
self._thread: threading.Thread | None = None
self._closed = False
logger.info("WeexSyncClient initialized")
def _ensure_event_loop(self) -> asyncio.AbstractEventLoop:
"""Ensure we have an event loop running in a background thread."""
if self._loop is None or self._loop.is_closed():
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(
target=self._run_event_loop,
daemon=True,
name="weex-sync-loop",
)
self._thread.start()
# Wait for loop to be ready
while not self._loop.is_running():
threading.Event().wait(0.001)
return self._loop
def _run_event_loop(self) -> None:
"""Run the event loop in a background thread."""
if self._loop:
asyncio.set_event_loop(self._loop)
self._loop.run_forever()
def _run_async(self, coro) -> Any:
"""Run async coroutine in background thread and return result."""
loop = self._ensure_event_loop()
if loop.is_closed():
raise RuntimeError("Event loop is closed")
if threading.current_thread() == self._thread:
# We're already in the event loop thread
return loop.run_until_complete(coro)
else:
# Run in background thread
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result(timeout=self.timeout + 10)
[docs]
def __enter__(self) -> Self:
"""Context manager entry."""
return self
[docs]
def __exit__(self, exc_type, exc, tb) -> None:
"""Context manager exit."""
self.close()
[docs]
def get_account_balance(self) -> ResponseData:
"""GET /capi/v2/account/assets - Get account balance."""
return self._run_async(self._async_client.get_account_balance())
[docs]
def get_all_positions(self) -> ResponseData:
"""GET /capi/v2/account/position/allPosition - Get all positions."""
return self._run_async(self._async_client.get_all_positions())
[docs]
def get_position(self, symbol: Symbol) -> ResponseData:
"""GET /capi/v2/account/position/singlePosition - Get single position."""
return self._run_async(self._async_client.get_position(symbol))
[docs]
def place_order(self, order_request: PlaceOrderRequest) -> dict[str, Any]:
"""POST /capi/v2/order/placeOrder - Place a new order."""
return self._run_async(self._async_client.place_order(order_request))
[docs]
def cancel_order(
self,
order_id: OrderId | None = None,
client_oid: ClientOrderId | None = None,
) -> dict[str, Any]:
"""POST /capi/v2/order/cancel_order - Cancel an order."""
return self._run_async(
self._async_client.cancel_order(order_id=order_id, client_oid=client_oid)
)
[docs]
def get_all_tickers(self) -> ResponseData:
"""GET /capi/v2/market/tickers - Get all tickers."""
return self._run_async(self._async_client.get_all_tickers())
[docs]
def get_ticker(self, symbol: Symbol) -> ResponseData:
"""GET /capi/v2/market/ticker - Get ticker for specific symbol."""
return self._run_async(self._async_client.get_ticker(symbol))
[docs]
def get_order_book(self, symbol: Symbol, limit: int = 15) -> ResponseData:
"""GET /capi/v2/market/depth - Get order book."""
return self._run_async(
self._async_client.request(
"GET", f"/capi/v2/market/depth?symbol={symbol}&limit={limit}"
)
)
[docs]
def get_trades(self, symbol: Symbol, limit: int = 100) -> ResponseData:
"""GET /capi/v2/market/trades - Get recent trades."""
return self._run_async(
self._async_client.request(
"GET", f"/capi/v2/market/trades?symbol={symbol}&limit={limit}"
)
)
[docs]
def get_contracts(self, symbol: Symbol | None = None) -> ResponseData:
"""GET /capi/v2/market/contracts - Get contract information."""
if symbol:
return self._run_async(
self._async_client.request(
"GET", f"/capi/v2/public/contracts?symbol={symbol}"
)
)
else:
return self._run_async(
self._async_client.request("GET", "/capi/v2/public/contracts")
)
[docs]
def get_market_overview(
self,
symbol: Symbol,
*,
order_book_limit: int = 15,
trades_limit: int = 100,
return_exceptions: bool = False,
) -> dict[str, Any]:
"""
Get market overview with concurrent requests.
This method showcases the power of the async implementation
while providing a synchronous interface.
"""
return self._run_async(
self._async_client.get_market_overview(
symbol,
order_book_limit=order_book_limit,
trades_limit=trades_limit,
return_exceptions=return_exceptions,
)
)
[docs]
def get_multiple_positions(
self,
symbols: list[Symbol],
*,
return_exceptions: bool = False,
) -> list[dict[str, Any] | Exception]:
"""
Get positions for multiple symbols concurrently.
Returns results in the same order as input symbols.
"""
return self._run_async(
self._async_client.get_multiple_positions(
symbols, return_exceptions=return_exceptions
)
)
[docs]
def close(self) -> None:
"""Close the client and cleanup resources."""
self._closed = True
if self._loop and not self._loop.is_closed():
# Schedule async client cleanup
asyncio.run_coroutine_threadsafe(
self._async_client.close(), self._loop
).result(timeout=5)
# Stop the event loop
self._loop.call_soon_threadsafe(self._loop.stop)
# Wait for thread to finish
if self._thread and self._thread.is_alive():
self._thread.join(timeout=5)
logger.info("WeexSyncClient closed")
# Legacy compatibility methods
[docs]
def get(self) -> ResponseData:
"""Legacy method for backward compatibility."""
return self.get_account_balance()
# Convenience function for quick sync client creation
[docs]
def create_sync_client(
api_key: str,
secret_key: str,
passphrase: str,
environment: str = "production",
*,
timeout: Timeout = 30.0,
max_retries: int = 3,
backoff_base: float = 0.5,
backoff_max: float = 8.0,
) -> WeexSyncClient:
"""
Create a pre-configured synchronous Weex client.
This is a convenience function for quick client creation
without needing to create a WeexConfig instance first.
Args:
api_key: Your Weex API key
secret_key: Your Weex secret key
passphrase: Your Weex API passphrase
environment: Environment ('production', 'sandbox', 'test')
timeout: Request timeout in seconds
max_retries: Maximum retry attempts
backoff_base: Base backoff delay for retries
backoff_max: Maximum backoff delay
Returns:
Configured WeexSyncClient instance
"""
# Import WeexConfig with proper typing to avoid circular imports
from .config import WeexConfig
config = WeexConfig(
api_key=api_key,
secret_key=secret_key,
passphrase=passphrase,
environment=environment, # type: ignore[arg-type]
)
return WeexSyncClient(
config=config,
timeout=timeout,
max_retries=max_retries,
backoff_base=backoff_base,
backoff_max=backoff_max,
)