Coverage for yield_analysis_sdk\subgraph.py: 79%

47 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-07-23 22:10 +0800

1from typing import Any, Dict, List 

2 

3from requests import post 

4 

5from .exceptions import ConfigurationError, ConnectionError 

6from .type import Chain, SharePriceHistory 

7from .validators import validate_address_value 

8 

9SUBGRAPH_QUERY_URLS = { 

10 Chain.BASE: "https://gateway.thegraph.com/api/subgraphs/id/46pQKDXgcredBSK9cbGU8qEaPEpEZgQ72hSAkpWnKinJ", 

11} 

12 

13daily_share_price_query = """ 

14query DailyPriceHistory($vault_addresses: [Bytes!], $length: Int!) { 

15 vaultStats_collection( 

16 interval: day 

17 orderBy: timestamp 

18 orderDirection: desc 

19 first: $length 

20 where: { 

21 vault_: { 

22 address_in: $vault_addresses 

23 } 

24 } 

25 ) { 

26 timestamp 

27 pricePerShare 

28 vault { 

29 address 

30 name 

31 decimals 

32 } 

33 } 

34} 

35""" 

36 

37 

38def _format_vault_addresses(addresses: List[str]) -> List[str]: 

39 """Format vault addresses to lowercase for GraphQL compatibility""" 

40 return [validate_address_value(addr) for addr in addresses] 

41 

42 

43def _send_graphql_query_to_subgraph( 

44 chain: Chain, 

45 query: str, 

46 variables: Dict[str, Any], 

47 api_key: str, 

48) -> Any: 

49 headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"} 

50 

51 # Prepare the request payload 

52 payload = {"query": query, "variables": variables} 

53 

54 # Send the GraphQL request to the Subgraph 

55 response = post(SUBGRAPH_QUERY_URLS[chain], headers=headers, json=payload) 

56 

57 # Check if the request was successful 

58 if response.status_code == 200: 

59 result = response.json() 

60 if "errors" in result: 

61 raise ConnectionError(f"GraphQL errors: {result['errors']}") 

62 else: 

63 raise ConnectionError(f"HTTP Error {response.status_code}: {response.text}") 

64 

65 return result 

66 

67 

68def _format_price_history_response( 

69 res: dict, underlying_asset_decimals: int 

70) -> List[SharePriceHistory]: 

71 if not res or "data" not in res or not res["data"]["vaultStats_collection"]: 

72 return [] 

73 

74 history_by_vault = {} 

75 

76 for entry in res["data"]["vaultStats_collection"]: 

77 vault_address = entry["vault"]["address"] 

78 vault_name = entry["vault"]["name"] 

79 vault_decimals = int(entry["vault"]["decimals"]) 

80 timestamp = ( 

81 int(entry["timestamp"]) // 1000000 

82 ) # Convert microseconds to seconds 

83 decimals_multiplier: float = 10 ** (vault_decimals - underlying_asset_decimals) 

84 price_per_share = float(entry["pricePerShare"]) * decimals_multiplier 

85 

86 if vault_address not in history_by_vault: 

87 history_by_vault[vault_address] = { 

88 "vault_name": vault_name, 

89 "vault_address": vault_address, 

90 "price_history": [], 

91 } 

92 

93 history_by_vault[vault_address]["price_history"].append( 

94 (timestamp, price_per_share) 

95 ) 

96 

97 # Sort price history by timestamp (oldest first) 

98 for vault_address in history_by_vault: 

99 history_by_vault[vault_address]["price_history"].sort(key=lambda x: x[0]) 

100 

101 # Convert to SharePriceHistory objects 

102 result = [] 

103 for vault_data in history_by_vault.values(): 

104 share_price_history = SharePriceHistory( 

105 vault_name=vault_data["vault_name"], 

106 vault_address=vault_data["vault_address"], 

107 price_history=vault_data["price_history"], 

108 ) 

109 result.append(share_price_history) 

110 

111 return result 

112 

113 

114def get_daily_share_price_history_from_subgraph( 

115 chain: Chain, 

116 vault_addresses: List[str], 

117 underlying_asset_decimals: int, 

118 length: int, 

119 api_key: str, 

120) -> List[SharePriceHistory]: 

121 """ 

122 Get the daily share price history from the subgraph for a list of vault addresses. 

123 

124 Args: 

125 chain: The blockchain chain to query. 

126 vault_addresses: A list of vault addresses to query. 

127 underlying_asset_decimals: The number of decimals of the underlying asset. e.g. 6 for USDC. 

128 length: The number of days to query. 

129 api_key: The API key for the subgraph. 

130 """ 

131 if not api_key: 

132 raise ConfigurationError("SUBGRAPH_API_KEY is required") 

133 

134 formatted_addresses = _format_vault_addresses(vault_addresses) 

135 

136 variables = { 

137 "vault_addresses": formatted_addresses, 

138 "length": length * len(formatted_addresses), 

139 } 

140 

141 res: dict = _send_graphql_query_to_subgraph( 

142 chain, daily_share_price_query, variables, api_key 

143 ) 

144 return _format_price_history_response(res, underlying_asset_decimals)