Coverage for yield_analysis_sdk\subgraph.py: 79%
47 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-15 13:20 +0800
« prev ^ index » next coverage.py v7.9.1, created at 2025-08-15 13:20 +0800
1from typing import Any, Dict, List
3from requests import post
5from .exceptions import ConfigurationError, ConnectionError
6from .type import Chain, SharePriceHistory
7from .validators import validate_address_value
9SUBGRAPH_QUERY_URLS = {
10 Chain.BASE: "https://gateway.thegraph.com/api/subgraphs/id/46pQKDXgcredBSK9cbGU8qEaPEpEZgQ72hSAkpWnKinJ",
11 Chain.ARBITRUM: "https://gateway.thegraph.com/api/subgraphs/id/AH842SqnNHmMM54fY6eX9sGSV4BPo8fmeoj5C3qbNsr1",
12}
14daily_share_price_query = """
15query DailyPriceHistory($vault_addresses: [Bytes!], $length: Int!) {
16 vaultStats_collection(
17 interval: day
18 orderBy: timestamp
19 orderDirection: desc
20 first: $length
21 where: {
22 vault_: {
23 address_in: $vault_addresses
24 }
25 }
26 ) {
27 timestamp
28 pricePerShare
29 vault {
30 address
31 name
32 decimals
33 }
34 }
35}
36"""
39def _format_vault_addresses(addresses: List[str]) -> List[str]:
40 """Format vault addresses to lowercase for GraphQL compatibility"""
41 return [validate_address_value(addr) for addr in addresses]
44def _send_graphql_query_to_subgraph(
45 chain: Chain,
46 query: str,
47 variables: Dict[str, Any],
48 api_key: str,
49) -> Any:
50 headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
52 # Prepare the request payload
53 payload = {"query": query, "variables": variables}
55 # Send the GraphQL request to the Subgraph
56 response = post(SUBGRAPH_QUERY_URLS[chain], headers=headers, json=payload)
58 # Check if the request was successful
59 if response.status_code == 200:
60 result = response.json()
61 if "errors" in result:
62 raise ConnectionError(f"GraphQL errors: {result['errors']}")
63 else:
64 raise ConnectionError(f"HTTP Error {response.status_code}: {response.text}")
66 return result
69def _format_price_history_response(
70 res: dict, underlying_asset_decimals: int
71) -> List[SharePriceHistory]:
72 if not res or "data" not in res or not res["data"]["vaultStats_collection"]:
73 return []
75 history_by_vault = {}
77 for entry in res["data"]["vaultStats_collection"]:
78 vault_address = entry["vault"]["address"]
79 vault_name = entry["vault"]["name"]
80 vault_decimals = int(entry["vault"]["decimals"])
81 timestamp = (
82 int(entry["timestamp"]) // 1000000
83 ) # Convert microseconds to seconds
84 decimals_multiplier: float = 10 ** (vault_decimals - underlying_asset_decimals)
85 price_per_share = float(entry["pricePerShare"]) * decimals_multiplier
87 if vault_address not in history_by_vault:
88 history_by_vault[vault_address] = {
89 "vault_name": vault_name,
90 "vault_address": vault_address,
91 "price_history": [],
92 }
94 history_by_vault[vault_address]["price_history"].append(
95 (timestamp, price_per_share)
96 )
98 # Sort price history by timestamp (oldest first)
99 for vault_address in history_by_vault:
100 history_by_vault[vault_address]["price_history"].sort(key=lambda x: x[0])
102 # Convert to SharePriceHistory objects
103 result = []
104 for vault_data in history_by_vault.values():
105 share_price_history = SharePriceHistory(
106 vault_name=vault_data["vault_name"],
107 vault_address=vault_data["vault_address"],
108 price_history=vault_data["price_history"],
109 )
110 result.append(share_price_history)
112 return result
115def get_daily_share_price_history_from_subgraph(
116 chain: Chain,
117 vault_addresses: List[str],
118 underlying_asset_decimals: int,
119 length: int,
120 api_key: str,
121) -> List[SharePriceHistory]:
122 """
123 Get the daily share price history from the subgraph for a list of vault addresses.
125 Args:
126 chain: The blockchain chain to query.
127 vault_addresses: A list of vault addresses to query.
128 underlying_asset_decimals: The number of decimals of the underlying asset. e.g. 6 for USDC.
129 length: The number of days to query.
130 api_key: The API key for the subgraph.
131 """
132 if not api_key:
133 raise ConfigurationError("SUBGRAPH_API_KEY is required")
135 formatted_addresses = _format_vault_addresses(vault_addresses)
137 variables = {
138 "vault_addresses": formatted_addresses,
139 "length": length * len(formatted_addresses),
140 }
142 res: dict = _send_graphql_query_to_subgraph(
143 chain, daily_share_price_query, variables, api_key
144 )
145 return _format_price_history_response(res, underlying_asset_decimals)