Coverage for yield_analysis_sdk\subgraph.py: 79%

47 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-08-15 13:20 +0800

1from typing import Any, Dict, List 

2 

3from requests import post 

4 

5from .exceptions import ConfigurationError, ConnectionError 

6from .type import Chain, SharePriceHistory 

7from .validators import validate_address_value 

8 

9SUBGRAPH_QUERY_URLS = { 

10 Chain.BASE: "https://gateway.thegraph.com/api/subgraphs/id/46pQKDXgcredBSK9cbGU8qEaPEpEZgQ72hSAkpWnKinJ", 

11 Chain.ARBITRUM: "https://gateway.thegraph.com/api/subgraphs/id/AH842SqnNHmMM54fY6eX9sGSV4BPo8fmeoj5C3qbNsr1", 

12} 

13 

14daily_share_price_query = """ 

15query DailyPriceHistory($vault_addresses: [Bytes!], $length: Int!) { 

16 vaultStats_collection( 

17 interval: day 

18 orderBy: timestamp 

19 orderDirection: desc 

20 first: $length 

21 where: { 

22 vault_: { 

23 address_in: $vault_addresses 

24 } 

25 } 

26 ) { 

27 timestamp 

28 pricePerShare 

29 vault { 

30 address 

31 name 

32 decimals 

33 } 

34 } 

35} 

36""" 

37 

38 

39def _format_vault_addresses(addresses: List[str]) -> List[str]: 

40 """Format vault addresses to lowercase for GraphQL compatibility""" 

41 return [validate_address_value(addr) for addr in addresses] 

42 

43 

44def _send_graphql_query_to_subgraph( 

45 chain: Chain, 

46 query: str, 

47 variables: Dict[str, Any], 

48 api_key: str, 

49) -> Any: 

50 headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"} 

51 

52 # Prepare the request payload 

53 payload = {"query": query, "variables": variables} 

54 

55 # Send the GraphQL request to the Subgraph 

56 response = post(SUBGRAPH_QUERY_URLS[chain], headers=headers, json=payload) 

57 

58 # Check if the request was successful 

59 if response.status_code == 200: 

60 result = response.json() 

61 if "errors" in result: 

62 raise ConnectionError(f"GraphQL errors: {result['errors']}") 

63 else: 

64 raise ConnectionError(f"HTTP Error {response.status_code}: {response.text}") 

65 

66 return result 

67 

68 

69def _format_price_history_response( 

70 res: dict, underlying_asset_decimals: int 

71) -> List[SharePriceHistory]: 

72 if not res or "data" not in res or not res["data"]["vaultStats_collection"]: 

73 return [] 

74 

75 history_by_vault = {} 

76 

77 for entry in res["data"]["vaultStats_collection"]: 

78 vault_address = entry["vault"]["address"] 

79 vault_name = entry["vault"]["name"] 

80 vault_decimals = int(entry["vault"]["decimals"]) 

81 timestamp = ( 

82 int(entry["timestamp"]) // 1000000 

83 ) # Convert microseconds to seconds 

84 decimals_multiplier: float = 10 ** (vault_decimals - underlying_asset_decimals) 

85 price_per_share = float(entry["pricePerShare"]) * decimals_multiplier 

86 

87 if vault_address not in history_by_vault: 

88 history_by_vault[vault_address] = { 

89 "vault_name": vault_name, 

90 "vault_address": vault_address, 

91 "price_history": [], 

92 } 

93 

94 history_by_vault[vault_address]["price_history"].append( 

95 (timestamp, price_per_share) 

96 ) 

97 

98 # Sort price history by timestamp (oldest first) 

99 for vault_address in history_by_vault: 

100 history_by_vault[vault_address]["price_history"].sort(key=lambda x: x[0]) 

101 

102 # Convert to SharePriceHistory objects 

103 result = [] 

104 for vault_data in history_by_vault.values(): 

105 share_price_history = SharePriceHistory( 

106 vault_name=vault_data["vault_name"], 

107 vault_address=vault_data["vault_address"], 

108 price_history=vault_data["price_history"], 

109 ) 

110 result.append(share_price_history) 

111 

112 return result 

113 

114 

115def get_daily_share_price_history_from_subgraph( 

116 chain: Chain, 

117 vault_addresses: List[str], 

118 underlying_asset_decimals: int, 

119 length: int, 

120 api_key: str, 

121) -> List[SharePriceHistory]: 

122 """ 

123 Get the daily share price history from the subgraph for a list of vault addresses. 

124 

125 Args: 

126 chain: The blockchain chain to query. 

127 vault_addresses: A list of vault addresses to query. 

128 underlying_asset_decimals: The number of decimals of the underlying asset. e.g. 6 for USDC. 

129 length: The number of days to query. 

130 api_key: The API key for the subgraph. 

131 """ 

132 if not api_key: 

133 raise ConfigurationError("SUBGRAPH_API_KEY is required") 

134 

135 formatted_addresses = _format_vault_addresses(vault_addresses) 

136 

137 variables = { 

138 "vault_addresses": formatted_addresses, 

139 "length": length * len(formatted_addresses), 

140 } 

141 

142 res: dict = _send_graphql_query_to_subgraph( 

143 chain, daily_share_price_query, variables, api_key 

144 ) 

145 return _format_price_history_response(res, underlying_asset_decimals)