Coverage for centralnicreseller / apiconnector / apiclient.py: 96%

236 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-21 15:25 +0000

1# -*- coding: utf-8 -*- 

2""" 

3 centralnicreseller.apiconnector.apiclient 

4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 

5 This module covers all necessary functionality for http communicatiton 

6 with our Backend System. 

7 :copyright: © 2024 Team Internet Group PLC. 

8 :license: MIT, see LICENSE for more details. 

9""" 

10 

11from centralnicreseller.apiconnector.logger import Logger 

12from centralnicreseller.apiconnector.response import Response 

13from centralnicreseller.apiconnector.responsetemplatemanager import ( 

14 ResponseTemplateManager as RTM, 

15) 

16from centralnicreseller.apiconnector.socketconfig import SocketConfig 

17from centralnicreseller.apiconnector.idnaconverter import IDNAConverter 

18from urllib.parse import quote, unquote, urlparse, urlencode 

19from urllib.request import urlopen, Request 

20import re 

21import copy 

22import platform 

23 

24rtm = RTM() 

25 

26CNR_CONNECTION_URL_PROXY = "http://127.0.0.1/api/call.cgi" 

27CNR_CONNECTION_URL_LIVE = "https://api.rrpproxy.net/api/call.cgi" 

28CNR_CONNECTION_URL_OTE = "https://api-ote.rrpproxy.net/api/call.cgi" 

29 

30 

31class APIClient(object): 

32 def __init__(self): 

33 # API connection url 

34 self.setURL(CNR_CONNECTION_URL_LIVE) 

35 # Object covering API connection data 

36 self.__socketConfig = SocketConfig() 

37 # activity flag for debug mode 

38 self.__debugMode = False 

39 # API connection timeout setting 

40 self.__socketTimeout = 300 * 1000 

41 self.useLIVESystem() 

42 # user agent setting 

43 self.__ua = "" 

44 # additional connection settings 

45 self.__curlopts = {} 

46 # logger class instance 

47 self.setDefaultLogger() 

48 # subuser account name (subuser specific data view) 

49 self.__subUser = None 

50 # login role seperator 

51 self.__roleSeparator = ":" 

52 

53 def setCustomLogger(self, logger): 

54 """ 

55 Set custom logger to use instead of the default one 

56 """ 

57 self.__logger = logger 

58 return self 

59 

60 def setDefaultLogger(self): 

61 """ 

62 Set default logger to use 

63 """ 

64 self.__logger = Logger() 

65 return self 

66 

67 def setProxy(self, proxy): 

68 """ 

69 Set Proxy to use for API communication 

70 """ 

71 if proxy == "": 

72 self.__curlopts.pop("PROXY", None) 

73 else: 

74 self.__curlopts["PROXY"] = proxy 

75 return self 

76 

77 def getProxy(self): 

78 """ 

79 Get Proxy configuration value for API communication 

80 """ 

81 if "PROXY" in self.__curlopts: 

82 return self.__curlopts["PROXY"] 

83 return None 

84 

85 def setReferer(self, referer): 

86 """ 

87 Set the Referer Header to use for API communication 

88 """ 

89 if referer == "": 

90 self.__curlopts.pop("REFERER", None) 

91 else: 

92 self.__curlopts["REFERER"] = referer 

93 return self 

94 

95 def getReferer(self): 

96 """ 

97 Get the Referer Header configuration value 

98 """ 

99 if "REFERER" in self.__curlopts: 

100 return self.__curlopts["REFERER"] 

101 return None 

102 

103 def enableDebugMode(self): 

104 """ 

105 Enable Debug Output to STDOUT 

106 """ 

107 self.__debugMode = True 

108 return self 

109 

110 def disableDebugMode(self): 

111 """ 

112 Disable Debug Output 

113 """ 

114 self.__debugMode = False 

115 return self 

116 

117 def getPOSTData(self, cmd, secured=False): 

118 """ 

119 Serialize given command for POST request including connection configuration data 

120 """ 

121 data = self.__socketConfig.getPOSTData() 

122 if secured: 

123 data = re.sub(r"s_pw=[^&]+", "s_pw=***", data) 

124 

125 if isinstance(cmd, str): 

126 tmp = cmd.rstrip("\n") 

127 else: 

128 tmp = "\n".join( 

129 "{}={}".format(key, re.sub(r"[\r\n]", "", str(cmd[key]))) 

130 for key in sorted(cmd.keys()) 

131 if cmd[key] is not None 

132 ) 

133 

134 if secured: 

135 tmp = re.sub(r"PASSWORD=[^\n]+", "PASSWORD=***", tmp) 

136 

137 if tmp: 

138 return f"{data}{quote('s_command')}={quote(tmp)}" 

139 else: 

140 return data if not data.endswith("&") else data.rstrip("&") 

141 

142 def getURL(self): 

143 """ 

144 Get the API connection url that is currently set 

145 """ 

146 return self.__socketURL 

147 

148 def getUserAgent(self): 

149 """ 

150 Get the User Agent 

151 """ 

152 if len(self.__ua) == 0: 

153 pid = "PYTHON-SDK" 

154 pyv = platform.python_version() 

155 pf = platform.system() 

156 arch = platform.architecture()[0] 

157 self.__ua = "%s (%s; %s; rv:%s) python/%s" % ( 

158 pid, 

159 pf, 

160 arch, 

161 self.getVersion(), 

162 pyv, 

163 ) 

164 return self.__ua 

165 

166 def setUserAgent(self, pid, rv, modules=[]): 

167 """ 

168 Possibility to customize default user agent to fit your needs by given string and revision 

169 """ 

170 s = " " 

171 mods = "" 

172 if len(modules) > 0: 

173 mods += " " + s.join(modules) 

174 pyv = platform.python_version() 

175 pf = platform.system() 

176 arch = platform.architecture()[0] 

177 self.__ua = "%s (%s; %s; rv:%s)%s python-sdk/%s python/%s" % ( 

178 pid, 

179 pf, 

180 arch, 

181 rv, 

182 mods, 

183 self.getVersion(), 

184 pyv, 

185 ) 

186 return self 

187 

188 def getVersion(self): 

189 """ 

190 Get the current module version 

191 """ 

192 return "5.0.1" 

193 

194 def saveSession(self, session): 

195 """ 

196 Apply session data (session id and user login) to given client request session 

197 """ 

198 login = self.__socketConfig.getLogin() 

199 session_id = self.__socketConfig.getSession() 

200 if not login or not session_id: 

201 session.pop("socketcfg", None) 

202 return self 

203 

204 session["socketcfg"] = { 

205 "login": login, 

206 "session": session_id, 

207 } 

208 return self 

209 

210 def reuseSession(self, session): 

211 """ 

212 Use existing configuration out of session 

213 to rebuild and reuse connection settings 

214 """ 

215 socketcfg = session.get("socketcfg") if session else None 

216 if ( 

217 not socketcfg 

218 or not socketcfg.get("login") 

219 or not socketcfg.get("session") 

220 ): 

221 return self 

222 self.setCredentials(socketcfg["login"]) 

223 self.__socketConfig.setSession(socketcfg["session"]) 

224 return self 

225 

226 def setURL(self, value): 

227 """ 

228 Set another connection url to be used for API communication 

229 """ 

230 self.__socketURL = value 

231 return self 

232 

233 def setPersistent(self): 

234 """echo 

235 Set persistent connection to be used for API communication 

236 """ 

237 self.__socketConfig.setPersistent() 

238 return self 

239 

240 def setCredentials(self, uid, pw=""): 

241 """ 

242 Set Credentials to be used for API communication 

243 """ 

244 self.__socketConfig.setLogin(uid) 

245 self.__socketConfig.setPassword(pw) 

246 return self 

247 

248 def setRoleCredentials(self, uid, role, pw=""): 

249 """ 

250 Set Credentials to be used for API communication 

251 """ 

252 if role == "": 

253 return self.setCredentials(uid, pw) 

254 return self.setCredentials( 

255 ("{0}{1}{2}").format(uid, self.__roleSeparator, role), pw 

256 ) 

257 

258 def login(self): 

259 """ 

260 Perform API login to start session-based communication 

261 """ 

262 self.setPersistent() 

263 rr = self.request([], False) 

264 self.__socketConfig.setSession(None) # clean up all session related data 

265 if rr.isSuccess(): 

266 col = rr.getColumn("SESSIONID") 

267 session_id = col.getData()[0] if (col is not None) else None 

268 if session_id: 

269 self.__socketConfig.setSession(session_id) 

270 return rr 

271 

272 def logout(self): 

273 """ 

274 Perform API logout to close API session in use 

275 """ 

276 rr = self.request( 

277 { 

278 "COMMAND": "StopSession", 

279 } 

280 ) 

281 if rr.isSuccess(): 

282 self.__socketConfig.setSession(None) # clean up all session related data 

283 return rr 

284 

285 def request(self, cmd=[], setUserView=True): 

286 """ 

287 Perform API request using the given command 

288 """ 

289 newcmd = {} 

290 if (cmd is not None) and (len(cmd) > 0): 

291 # if subuser is set, add it to the command 

292 if setUserView and self.__subUser is not None: 

293 cmd["SUBUSER"] = self.__subUser 

294 

295 # flatten nested api command bulk parameters 

296 newcmd = self.__flattenCommand(cmd) 

297 # auto convert umlaut names to punycode 

298 newcmd = self.__autoIDNConvert(newcmd) 

299 

300 # request command to API 

301 cfg = {"CONNECTION_URL": self.__socketURL} 

302 data = self.getPOSTData(newcmd).encode("UTF-8") 

303 secured = self.getPOSTData(newcmd, True).encode("UTF-8") 

304 error = None 

305 try: 

306 headers = {"User-Agent": self.getUserAgent()} 

307 if "REFERER" in self.__curlopts: 

308 headers["Referer"] = self.__curlopts["REFERER"] 

309 req = Request(cfg["CONNECTION_URL"], data, headers) 

310 if "PROXY" in self.__curlopts: 

311 proxyurl = urlparse(self.__curlopts["PROXY"]) 

312 req.set_proxy(proxyurl.netloc, proxyurl.scheme) 

313 body = urlopen(req, timeout=self.__socketTimeout).read() 

314 except Exception as e: 

315 error = str(e) 

316 body = rtm.getTemplate("httperror").getPlain() 

317 r = Response(body, newcmd, cfg) 

318 if self.__debugMode: 

319 self.__logger.log(secured, r, error) 

320 return r 

321 

322 def requestNextResponsePage(self, rr): 

323 """ 

324 Request the next page of list entries for the current list query 

325 Useful for tables 

326 """ 

327 mycmd = rr.getCommand() 

328 if "LAST" in mycmd: 

329 raise Exception( 

330 "Parameter LAST in use. Please remove it to avoid issues in requestNextPage." 

331 ) 

332 first = 0 

333 if "FIRST" in mycmd: 

334 first = int(mycmd["FIRST"]) 

335 total = rr.getRecordsTotalCount() 

336 limit = rr.getRecordsLimitation() 

337 first += limit 

338 if first < total: 

339 mycmd["FIRST"] = first 

340 mycmd["LIMIT"] = limit 

341 return self.request(mycmd) 

342 else: 

343 return None 

344 

345 def requestAllResponsePages(self, cmd): 

346 """ 

347 Request all pages/entries for the given query command 

348 """ 

349 responses = [] 

350 mycmd = copy.deepcopy(cmd) 

351 mycmd["FIRST"] = 0 

352 rr = self.request(mycmd) 

353 tmp = rr 

354 while tmp is not None: 

355 responses.append(tmp) 

356 tmp = self.requestNextResponsePage(tmp) 

357 if tmp is None: 

358 break 

359 return responses 

360 

361 def setUserView(self, uid): 

362 """ 

363 Set a data view to a given subuser 

364 """ 

365 self.__subUser = uid 

366 return self 

367 

368 def resetUserView(self): 

369 """ 

370 Reset data view back from subuser to user 

371 """ 

372 self.__subUser = None 

373 return self 

374 

375 def useHighPerformanceConnectionSetup(self): 

376 """ 

377 Activate High Performance Setup 

378 """ 

379 self.setURL(CNR_CONNECTION_URL_PROXY) 

380 return self 

381 

382 def useDefaultConnectionSetup(self): 

383 """ 

384 Activate Default Connection Setup (which is the default anyways) 

385 """ 

386 self.setURL(CNR_CONNECTION_URL_LIVE) 

387 return self 

388 

389 def useOTESystem(self): 

390 """ 

391 Set OT&E System for API communication 

392 """ 

393 self.setURL(CNR_CONNECTION_URL_OTE) 

394 return self 

395 

396 def useLIVESystem(self): 

397 """ 

398 Set LIVE System for API communication (this is the default setting) 

399 """ 

400 self.setURL(CNR_CONNECTION_URL_LIVE) 

401 return self 

402 

403 def __flattenCommand(self, cmd): 

404 """ 

405 Flatten API command to handle it easier later on (nested array for bulk params) 

406 """ 

407 newcmd = {} 

408 for key in list(cmd.keys()): 

409 newKey = key.upper() 

410 val = cmd[key] 

411 if val is None: 

412 continue 

413 if isinstance(val, list): 

414 i = 0 

415 while i < len(val): 

416 newcmd[newKey + str(i)] = re.sub(r"[\r\n]", "", str(val[i])) 

417 i += 1 

418 else: 

419 newcmd[newKey] = re.sub(r"[\r\n]", "", str(val)) 

420 return newcmd 

421 

422 def __autoIDNConvert(self, cmd): 

423 """ 

424 Converts domain names in the cmd dictionary to their ASCII (Punycode) representations. 

425 """ 

426 key_pattern = re.compile(r"(?i)^(NAMESERVER|NS|DNSZONE)([0-9]*)$") 

427 obj_class_pattern = re.compile( 

428 r"(?i)^(DOMAIN(APPLICATION|BLOCKING)?|NAMESERVER|NS|DNSZONE)$" 

429 ) 

430 ascii_pattern = re.compile(r"^[A-Za-z0-9.\-]+$") 

431 

432 to_convert = [] 

433 idxs = [] 

434 

435 for key, val in cmd.items(): 

436 if ( 

437 key_pattern.match(key) 

438 or ( 

439 key.upper() == "OBJECTID" 

440 and obj_class_pattern.match(cmd.get("OBJECTCLASS", "")) 

441 ) 

442 ) and not ascii_pattern.match(val): 

443 to_convert.append(val) 

444 idxs.append(key) 

445 

446 if to_convert: 

447 result = IDNAConverter.convert_list(to_convert) 

448 pc_list = result.get_pc_list() 

449 

450 for idx, converted_value in zip(idxs, pc_list): 

451 cmd[idx] = converted_value 

452 

453 return cmd