Async Client API
The WeexAsyncClient is the main interface for interacting with the Weex API using async/await patterns.
- class weex_client.client.WeexAsyncClient(config, *, timeout=30.0, max_retries=3, backoff_base=0.5, backoff_max=8.0)[source]
Bases:
objectModern async-first Weex API client for Python 3.14+.
Key Features: - Self type in __aenter__ for type safety - TaskGroup for concurrent operations (Python 3.11+) - Enhanced pattern matching for error handling - Structured logging with contextual information - Type safety with strict typing - Resource management with automatic cleanup - Rate limiting with built-in detection
- __init__(config, *, timeout=30.0, max_retries=3, backoff_base=0.5, backoff_max=8.0)[source]
Initialize WeexAsyncClient with configuration.
Python 3.14 improvements: - Better type inference - Enhanced error context - Improved resource management
- async __aenter__()[source]
Async context manager entry with Python 3.14 Self type.
Ensures proper resource initialization and cleanup.
- async __aexit__(exc_type, exc, tb)[source]
Async context manager exit with proper cleanup.
Python 3.14 enhances exception handling in async contexts.
- async request(method, url_or_path, *, params=None, json=None, headers=None, auth=True, body=None)[source]
Make API request with authentication and error handling.
High-level interface that handles URL preparation, authentication, and response parsing automatically.
- Parameters:
- Returns:
Parsed JSON response
- Return type:
ResponseData
- Raises:
Various WEEXError subclasses based on API response –
- async get_position(symbol)[source]
GET /capi/v2/account/position/singlePosition - Get single position.
- async cancel_order(order_id=None, client_oid=None)[source]
POST /capi/v2/order/cancel_order - Cancel an order.
- async get_multiple_positions(symbols, *, return_exceptions=False)[source]
Get positions for multiple symbols using TaskGroup.
Demonstrates Python 3.14’s TaskGroup for concurrent operations.
- async stream_tickers(symbols)[source]
Stream tickers for multiple symbols using async generator.
Python 3.14 async generator example for continuous data streaming.
- async get_market_overview(symbol, *, order_book_limit=15, trades_limit=100, return_exceptions=False)[source]
Get market overview with concurrent requests.
Fetches ticker, order book, trades, and contract info concurrently.
- async get_order_book(symbol, limit=15)[source]
GET /capi/v2/market/depth - Get order book.
- Parameters:
symbol (
weex_client.types.TypeAliasType) – Trading symbollimit (
int) – Number of price levels (default: 15)
- Returns:
Order book data with bids and asks
- async get_trades(symbol, limit=100)[source]
GET /capi/v2/market/trades - Get recent trades.
- Parameters:
symbol (
weex_client.types.TypeAliasType) – Trading symbollimit (
int) – Number of recent trades (default: 100)
- Returns:
List of recent trades
Account Management
- async WeexAsyncClient.get_account_balance()[source]
GET /capi/v2/account/assets - Get account balance.
- async WeexAsyncClient.get_all_positions()[source]
GET /capi/v2/account/position/allPosition - Get all positions.
Order Management
Market Data
- async WeexAsyncClient.get_ticker(symbol)[source]
GET /capi/v2/market/ticker - Get ticker for specific symbol.
- async WeexAsyncClient.get_order_book(symbol, limit=15)[source]
GET /capi/v2/market/depth - Get order book.
- Parameters:
symbol (
weex_client.types.TypeAliasType) – Trading symbollimit (
int) – Number of price levels (default: 15)
- Returns:
Order book data with bids and asks
Advanced Features
Context Manager
The client implements the async context manager protocol for proper resource management:
import asyncio
from weex_client import WeexAsyncClient, WeexConfig
async def trading_example():
config = WeexConfig.from_env()
async with WeexAsyncClient(config) as client:
# Client resources are automatically managed
balance = await client.get_account_balance()
print(f"Balance: {balance}")
# Client is automatically closed when exiting the context
Error Handling
The client uses Python 3.14 pattern matching for error handling:
import asyncio
from weex_client import WeexAsyncClient, WeexConfig
from weex_client.exceptions import (
WEEXAuthenticationError,
WEEXRateLimitError,
WEEXError
)
async def robust_trading():
config = WeexConfig.from_env()
async with WeexAsyncClient(config) as client:
try:
balance = await client.get_account_balance()
return balance
except WEEXError as e:
match e:
case WEEXAuthenticationError(code=code):
print(f"🔐 Authentication failed: {code}")
# Handle authentication issues
case WEEXRateLimitError(retry_after=delay):
print(f"⏱️ Rate limited: retry after {delay}s")
# Implement retry logic
case _:
print(f"❌ General error: {e}")
# Handle other errors
Basic Usage Examples
Get Account Balance
async def check_balance():
config = WeexConfig.from_env()
async with WeexAsyncClient(config) as client:
balance_data = await client.get_account_balance()
available_balance = float(balance_data["data"]["balance"])
print(f"Available balance: ${available_balance:.2f}")
Safe Order Placement (1% Rule)
async def place_safe_order(symbol: str, order_type: str = "1"):
"""Place order with 1% risk management"""
config = WeexConfig.from_env()
async with WeexAsyncClient(config) as client:
# Get current balance
balance_data = await client.get_account_balance()
available_balance = float(balance_data["data"]["balance"])
# Calculate 1% position size
max_risk_amount = available_balance * 0.01
ticker_data = await client.get_ticker(symbol)
current_price = float(ticker_data["data"]["last"])
# Calculate safe position size
safe_size = max_risk_amount / current_price
# Place order with safety
from weex_client.models import PlaceOrderRequest
order = PlaceOrderRequest(
symbol=symbol,
client_oid=f"safe_order_{int(time.time())}",
size=str(safe_size),
type=order_type,
order_type="1", # Market order
match_price="1" # Use current price
)
result = await client.place_order(order)
print(f"✅ Safe order placed: {result}")
return result
Multiple Operations with TaskGroup
async def get_trading_overview(symbols: list[str]):
"""Get concurrent market data for multiple symbols"""
config = WeexConfig.from_env()
async with WeexAsyncClient(config) as client:
try:
async with asyncio.TaskGroup() as tg:
tasks = []
for symbol in symbols:
# Start concurrent requests
ticker_task = tg.create_task(
client.get_ticker(symbol)
)
position_task = tg.create_task(
client.get_position(symbol)
)
tasks.append((symbol, ticker_task, position_task))
results = {}
for symbol, ticker_task, position_task in tasks:
ticker_result = await ticker_task
position_result = await position_task
results[symbol] = {
"ticker": ticker_result,
"position": position_result
}
return results
except* Exception as eg:
# Handle multiple concurrent errors
for exc in eg.exceptions:
print(f"Error in concurrent operation: {exc}")
Real-time Data Streaming
async def monitor_market(symbol: str):
"""Monitor real-time market data"""
config = WeexConfig.from_env()
async with WeexAsyncClient(config) as client:
print(f"📊 Monitoring {symbol}...")
async for ticker_message in client.stream_tickers([symbol]):
price = ticker_message.data.get("last", "N/A")
timestamp = ticker_message.timestamp
print(f"{timestamp}: {symbol} = ${price}")
# Add your trading logic here
# await analyze_and_trade(price, timestamp)
Position Management
async def manage_positions():
"""Monitor and manage open positions"""
config = WeexConfig.from_env()
async with WeexAsyncClient(config) as client:
positions_data = await client.get_all_positions()
if not positions_data.get("data"):
print("📊 No open positions")
return
for position in positions_data["data"]:
symbol = position.get("symbol")
size = float(position.get("size", 0))
side = position.get("side", "unknown")
print(f"📈 {symbol}: {side} {size}")
# Add position management logic
if size > 0:
# Position exists, consider risk management
await manage_position_risk(client, symbol, position)
Performance Considerations
Connection Pooling
The client automatically manages connection pooling for optimal performance:
# Configure for high-frequency operations
config = WeexConfig(
# ... other config
connection=ConnectionConfig(
max_connections=20, # Increase for concurrent operations
timeout=10 # Reduce timeout for faster responses
)
)
Rate Limiting
The client includes built-in rate limit detection:
async def rate_limited_trading():
config = WeexConfig.from_env()
async with WeexAsyncClient(config) as client:
try:
# Operations will automatically respect rate limits
results = await asyncio.gather(*[
client.get_ticker(f"BTCUSDT{i}")
for i in range(10)
])
return results
except WEEXRateLimitError as e:
print(f"⏱️ Rate limited: {e.retry_after}s")
await asyncio.sleep(e.retry_after)
# Retry logic here
Memory Efficiency
Use streaming for large datasets:
async def process_large_dataset():
config = WeexConfig.from_env()
async with WeexAsyncClient(config) as client:
# Stream instead of loading all data at once
async for trade_data in client.stream_trades("BTCUSDT"):
# Process each trade as it arrives
await process_trade(trade_data)
# Memory usage stays constant regardless of data volume
Common Patterns
Retry Logic
async def resilient_api_call(api_call, *args, max_retries=3):
"""Generic retry logic for API calls"""
for attempt in range(max_retries):
try:
return await api_call(*args)
except WEEXRateLimitError as e:
if attempt < max_retries - 1:
await asyncio.sleep(e.retry_after)
continue
raise
except WEEXError as e:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
continue
raise
Batch Operations
async def batch_order_management(order_requests: list[PlaceOrderRequest]):
"""Place multiple orders efficiently"""
config = WeexConfig.from_env()
async with WeexAsyncClient(config) as client:
# Process orders in batches to respect rate limits
batch_size = 5
for i in range(0, len(order_requests), batch_size):
batch = order_requests[i:i + batch_size]
results = await asyncio.gather(*[
client.place_order(order) for order in batch
], return_exceptions=True)
for order, result in zip(batch, results):
if isinstance(result, Exception):
print(f"❌ Order failed: {order.client_oid} - {result}")
else:
print(f"✅ Order placed: {order.client_oid}")
# Brief pause between batches
if i + batch_size < len(order_requests):
await asyncio.sleep(0.5)