Coverage for yield_analysis_sdk\subgraph.py: 79%
47 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-23 22:10 +0800
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-23 22:10 +0800
1from typing import Any, Dict, List
3from requests import post
5from .exceptions import ConfigurationError, ConnectionError
6from .type import Chain, SharePriceHistory
7from .validators import validate_address_value
9SUBGRAPH_QUERY_URLS = {
10 Chain.BASE: "https://gateway.thegraph.com/api/subgraphs/id/46pQKDXgcredBSK9cbGU8qEaPEpEZgQ72hSAkpWnKinJ",
11}
13daily_share_price_query = """
14query DailyPriceHistory($vault_addresses: [Bytes!], $length: Int!) {
15 vaultStats_collection(
16 interval: day
17 orderBy: timestamp
18 orderDirection: desc
19 first: $length
20 where: {
21 vault_: {
22 address_in: $vault_addresses
23 }
24 }
25 ) {
26 timestamp
27 pricePerShare
28 vault {
29 address
30 name
31 decimals
32 }
33 }
34}
35"""
38def _format_vault_addresses(addresses: List[str]) -> List[str]:
39 """Format vault addresses to lowercase for GraphQL compatibility"""
40 return [validate_address_value(addr) for addr in addresses]
43def _send_graphql_query_to_subgraph(
44 chain: Chain,
45 query: str,
46 variables: Dict[str, Any],
47 api_key: str,
48) -> Any:
49 headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
51 # Prepare the request payload
52 payload = {"query": query, "variables": variables}
54 # Send the GraphQL request to the Subgraph
55 response = post(SUBGRAPH_QUERY_URLS[chain], headers=headers, json=payload)
57 # Check if the request was successful
58 if response.status_code == 200:
59 result = response.json()
60 if "errors" in result:
61 raise ConnectionError(f"GraphQL errors: {result['errors']}")
62 else:
63 raise ConnectionError(f"HTTP Error {response.status_code}: {response.text}")
65 return result
68def _format_price_history_response(
69 res: dict, underlying_asset_decimals: int
70) -> List[SharePriceHistory]:
71 if not res or "data" not in res or not res["data"]["vaultStats_collection"]:
72 return []
74 history_by_vault = {}
76 for entry in res["data"]["vaultStats_collection"]:
77 vault_address = entry["vault"]["address"]
78 vault_name = entry["vault"]["name"]
79 vault_decimals = int(entry["vault"]["decimals"])
80 timestamp = (
81 int(entry["timestamp"]) // 1000000
82 ) # Convert microseconds to seconds
83 decimals_multiplier: float = 10 ** (vault_decimals - underlying_asset_decimals)
84 price_per_share = float(entry["pricePerShare"]) * decimals_multiplier
86 if vault_address not in history_by_vault:
87 history_by_vault[vault_address] = {
88 "vault_name": vault_name,
89 "vault_address": vault_address,
90 "price_history": [],
91 }
93 history_by_vault[vault_address]["price_history"].append(
94 (timestamp, price_per_share)
95 )
97 # Sort price history by timestamp (oldest first)
98 for vault_address in history_by_vault:
99 history_by_vault[vault_address]["price_history"].sort(key=lambda x: x[0])
101 # Convert to SharePriceHistory objects
102 result = []
103 for vault_data in history_by_vault.values():
104 share_price_history = SharePriceHistory(
105 vault_name=vault_data["vault_name"],
106 vault_address=vault_data["vault_address"],
107 price_history=vault_data["price_history"],
108 )
109 result.append(share_price_history)
111 return result
114def get_daily_share_price_history_from_subgraph(
115 chain: Chain,
116 vault_addresses: List[str],
117 underlying_asset_decimals: int,
118 length: int,
119 api_key: str,
120) -> List[SharePriceHistory]:
121 """
122 Get the daily share price history from the subgraph for a list of vault addresses.
124 Args:
125 chain: The blockchain chain to query.
126 vault_addresses: A list of vault addresses to query.
127 underlying_asset_decimals: The number of decimals of the underlying asset. e.g. 6 for USDC.
128 length: The number of days to query.
129 api_key: The API key for the subgraph.
130 """
131 if not api_key:
132 raise ConfigurationError("SUBGRAPH_API_KEY is required")
134 formatted_addresses = _format_vault_addresses(vault_addresses)
136 variables = {
137 "vault_addresses": formatted_addresses,
138 "length": length * len(formatted_addresses),
139 }
141 res: dict = _send_graphql_query_to_subgraph(
142 chain, daily_share_price_query, variables, api_key
143 )
144 return _format_price_history_response(res, underlying_asset_decimals)