Coverage for centralnicreseller / apiconnector / apiclient.py: 96%
236 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 15:25 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-21 15:25 +0000
1# -*- coding: utf-8 -*-
2"""
3 centralnicreseller.apiconnector.apiclient
4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 This module covers all necessary functionality for http communicatiton
6 with our Backend System.
7 :copyright: © 2024 Team Internet Group PLC.
8 :license: MIT, see LICENSE for more details.
9"""
11from centralnicreseller.apiconnector.logger import Logger
12from centralnicreseller.apiconnector.response import Response
13from centralnicreseller.apiconnector.responsetemplatemanager import (
14 ResponseTemplateManager as RTM,
15)
16from centralnicreseller.apiconnector.socketconfig import SocketConfig
17from centralnicreseller.apiconnector.idnaconverter import IDNAConverter
18from urllib.parse import quote, unquote, urlparse, urlencode
19from urllib.request import urlopen, Request
20import re
21import copy
22import platform
24rtm = RTM()
26CNR_CONNECTION_URL_PROXY = "http://127.0.0.1/api/call.cgi"
27CNR_CONNECTION_URL_LIVE = "https://api.rrpproxy.net/api/call.cgi"
28CNR_CONNECTION_URL_OTE = "https://api-ote.rrpproxy.net/api/call.cgi"
31class APIClient(object):
32 def __init__(self):
33 # API connection url
34 self.setURL(CNR_CONNECTION_URL_LIVE)
35 # Object covering API connection data
36 self.__socketConfig = SocketConfig()
37 # activity flag for debug mode
38 self.__debugMode = False
39 # API connection timeout setting
40 self.__socketTimeout = 300 * 1000
41 self.useLIVESystem()
42 # user agent setting
43 self.__ua = ""
44 # additional connection settings
45 self.__curlopts = {}
46 # logger class instance
47 self.setDefaultLogger()
48 # subuser account name (subuser specific data view)
49 self.__subUser = None
50 # login role seperator
51 self.__roleSeparator = ":"
53 def setCustomLogger(self, logger):
54 """
55 Set custom logger to use instead of the default one
56 """
57 self.__logger = logger
58 return self
60 def setDefaultLogger(self):
61 """
62 Set default logger to use
63 """
64 self.__logger = Logger()
65 return self
67 def setProxy(self, proxy):
68 """
69 Set Proxy to use for API communication
70 """
71 if proxy == "":
72 self.__curlopts.pop("PROXY", None)
73 else:
74 self.__curlopts["PROXY"] = proxy
75 return self
77 def getProxy(self):
78 """
79 Get Proxy configuration value for API communication
80 """
81 if "PROXY" in self.__curlopts:
82 return self.__curlopts["PROXY"]
83 return None
85 def setReferer(self, referer):
86 """
87 Set the Referer Header to use for API communication
88 """
89 if referer == "":
90 self.__curlopts.pop("REFERER", None)
91 else:
92 self.__curlopts["REFERER"] = referer
93 return self
95 def getReferer(self):
96 """
97 Get the Referer Header configuration value
98 """
99 if "REFERER" in self.__curlopts:
100 return self.__curlopts["REFERER"]
101 return None
103 def enableDebugMode(self):
104 """
105 Enable Debug Output to STDOUT
106 """
107 self.__debugMode = True
108 return self
110 def disableDebugMode(self):
111 """
112 Disable Debug Output
113 """
114 self.__debugMode = False
115 return self
117 def getPOSTData(self, cmd, secured=False):
118 """
119 Serialize given command for POST request including connection configuration data
120 """
121 data = self.__socketConfig.getPOSTData()
122 if secured:
123 data = re.sub(r"s_pw=[^&]+", "s_pw=***", data)
125 if isinstance(cmd, str):
126 tmp = cmd.rstrip("\n")
127 else:
128 tmp = "\n".join(
129 "{}={}".format(key, re.sub(r"[\r\n]", "", str(cmd[key])))
130 for key in sorted(cmd.keys())
131 if cmd[key] is not None
132 )
134 if secured:
135 tmp = re.sub(r"PASSWORD=[^\n]+", "PASSWORD=***", tmp)
137 if tmp:
138 return f"{data}{quote('s_command')}={quote(tmp)}"
139 else:
140 return data if not data.endswith("&") else data.rstrip("&")
142 def getURL(self):
143 """
144 Get the API connection url that is currently set
145 """
146 return self.__socketURL
148 def getUserAgent(self):
149 """
150 Get the User Agent
151 """
152 if len(self.__ua) == 0:
153 pid = "PYTHON-SDK"
154 pyv = platform.python_version()
155 pf = platform.system()
156 arch = platform.architecture()[0]
157 self.__ua = "%s (%s; %s; rv:%s) python/%s" % (
158 pid,
159 pf,
160 arch,
161 self.getVersion(),
162 pyv,
163 )
164 return self.__ua
166 def setUserAgent(self, pid, rv, modules=[]):
167 """
168 Possibility to customize default user agent to fit your needs by given string and revision
169 """
170 s = " "
171 mods = ""
172 if len(modules) > 0:
173 mods += " " + s.join(modules)
174 pyv = platform.python_version()
175 pf = platform.system()
176 arch = platform.architecture()[0]
177 self.__ua = "%s (%s; %s; rv:%s)%s python-sdk/%s python/%s" % (
178 pid,
179 pf,
180 arch,
181 rv,
182 mods,
183 self.getVersion(),
184 pyv,
185 )
186 return self
188 def getVersion(self):
189 """
190 Get the current module version
191 """
192 return "5.0.1"
194 def saveSession(self, session):
195 """
196 Apply session data (session id and user login) to given client request session
197 """
198 login = self.__socketConfig.getLogin()
199 session_id = self.__socketConfig.getSession()
200 if not login or not session_id:
201 session.pop("socketcfg", None)
202 return self
204 session["socketcfg"] = {
205 "login": login,
206 "session": session_id,
207 }
208 return self
210 def reuseSession(self, session):
211 """
212 Use existing configuration out of session
213 to rebuild and reuse connection settings
214 """
215 socketcfg = session.get("socketcfg") if session else None
216 if (
217 not socketcfg
218 or not socketcfg.get("login")
219 or not socketcfg.get("session")
220 ):
221 return self
222 self.setCredentials(socketcfg["login"])
223 self.__socketConfig.setSession(socketcfg["session"])
224 return self
226 def setURL(self, value):
227 """
228 Set another connection url to be used for API communication
229 """
230 self.__socketURL = value
231 return self
233 def setPersistent(self):
234 """echo
235 Set persistent connection to be used for API communication
236 """
237 self.__socketConfig.setPersistent()
238 return self
240 def setCredentials(self, uid, pw=""):
241 """
242 Set Credentials to be used for API communication
243 """
244 self.__socketConfig.setLogin(uid)
245 self.__socketConfig.setPassword(pw)
246 return self
248 def setRoleCredentials(self, uid, role, pw=""):
249 """
250 Set Credentials to be used for API communication
251 """
252 if role == "":
253 return self.setCredentials(uid, pw)
254 return self.setCredentials(
255 ("{0}{1}{2}").format(uid, self.__roleSeparator, role), pw
256 )
258 def login(self):
259 """
260 Perform API login to start session-based communication
261 """
262 self.setPersistent()
263 rr = self.request([], False)
264 self.__socketConfig.setSession(None) # clean up all session related data
265 if rr.isSuccess():
266 col = rr.getColumn("SESSIONID")
267 session_id = col.getData()[0] if (col is not None) else None
268 if session_id:
269 self.__socketConfig.setSession(session_id)
270 return rr
272 def logout(self):
273 """
274 Perform API logout to close API session in use
275 """
276 rr = self.request(
277 {
278 "COMMAND": "StopSession",
279 }
280 )
281 if rr.isSuccess():
282 self.__socketConfig.setSession(None) # clean up all session related data
283 return rr
285 def request(self, cmd=[], setUserView=True):
286 """
287 Perform API request using the given command
288 """
289 newcmd = {}
290 if (cmd is not None) and (len(cmd) > 0):
291 # if subuser is set, add it to the command
292 if setUserView and self.__subUser is not None:
293 cmd["SUBUSER"] = self.__subUser
295 # flatten nested api command bulk parameters
296 newcmd = self.__flattenCommand(cmd)
297 # auto convert umlaut names to punycode
298 newcmd = self.__autoIDNConvert(newcmd)
300 # request command to API
301 cfg = {"CONNECTION_URL": self.__socketURL}
302 data = self.getPOSTData(newcmd).encode("UTF-8")
303 secured = self.getPOSTData(newcmd, True).encode("UTF-8")
304 error = None
305 try:
306 headers = {"User-Agent": self.getUserAgent()}
307 if "REFERER" in self.__curlopts:
308 headers["Referer"] = self.__curlopts["REFERER"]
309 req = Request(cfg["CONNECTION_URL"], data, headers)
310 if "PROXY" in self.__curlopts:
311 proxyurl = urlparse(self.__curlopts["PROXY"])
312 req.set_proxy(proxyurl.netloc, proxyurl.scheme)
313 body = urlopen(req, timeout=self.__socketTimeout).read()
314 except Exception as e:
315 error = str(e)
316 body = rtm.getTemplate("httperror").getPlain()
317 r = Response(body, newcmd, cfg)
318 if self.__debugMode:
319 self.__logger.log(secured, r, error)
320 return r
322 def requestNextResponsePage(self, rr):
323 """
324 Request the next page of list entries for the current list query
325 Useful for tables
326 """
327 mycmd = rr.getCommand()
328 if "LAST" in mycmd:
329 raise Exception(
330 "Parameter LAST in use. Please remove it to avoid issues in requestNextPage."
331 )
332 first = 0
333 if "FIRST" in mycmd:
334 first = int(mycmd["FIRST"])
335 total = rr.getRecordsTotalCount()
336 limit = rr.getRecordsLimitation()
337 first += limit
338 if first < total:
339 mycmd["FIRST"] = first
340 mycmd["LIMIT"] = limit
341 return self.request(mycmd)
342 else:
343 return None
345 def requestAllResponsePages(self, cmd):
346 """
347 Request all pages/entries for the given query command
348 """
349 responses = []
350 mycmd = copy.deepcopy(cmd)
351 mycmd["FIRST"] = 0
352 rr = self.request(mycmd)
353 tmp = rr
354 while tmp is not None:
355 responses.append(tmp)
356 tmp = self.requestNextResponsePage(tmp)
357 if tmp is None:
358 break
359 return responses
361 def setUserView(self, uid):
362 """
363 Set a data view to a given subuser
364 """
365 self.__subUser = uid
366 return self
368 def resetUserView(self):
369 """
370 Reset data view back from subuser to user
371 """
372 self.__subUser = None
373 return self
375 def useHighPerformanceConnectionSetup(self):
376 """
377 Activate High Performance Setup
378 """
379 self.setURL(CNR_CONNECTION_URL_PROXY)
380 return self
382 def useDefaultConnectionSetup(self):
383 """
384 Activate Default Connection Setup (which is the default anyways)
385 """
386 self.setURL(CNR_CONNECTION_URL_LIVE)
387 return self
389 def useOTESystem(self):
390 """
391 Set OT&E System for API communication
392 """
393 self.setURL(CNR_CONNECTION_URL_OTE)
394 return self
396 def useLIVESystem(self):
397 """
398 Set LIVE System for API communication (this is the default setting)
399 """
400 self.setURL(CNR_CONNECTION_URL_LIVE)
401 return self
403 def __flattenCommand(self, cmd):
404 """
405 Flatten API command to handle it easier later on (nested array for bulk params)
406 """
407 newcmd = {}
408 for key in list(cmd.keys()):
409 newKey = key.upper()
410 val = cmd[key]
411 if val is None:
412 continue
413 if isinstance(val, list):
414 i = 0
415 while i < len(val):
416 newcmd[newKey + str(i)] = re.sub(r"[\r\n]", "", str(val[i]))
417 i += 1
418 else:
419 newcmd[newKey] = re.sub(r"[\r\n]", "", str(val))
420 return newcmd
422 def __autoIDNConvert(self, cmd):
423 """
424 Converts domain names in the cmd dictionary to their ASCII (Punycode) representations.
425 """
426 key_pattern = re.compile(r"(?i)^(NAMESERVER|NS|DNSZONE)([0-9]*)$")
427 obj_class_pattern = re.compile(
428 r"(?i)^(DOMAIN(APPLICATION|BLOCKING)?|NAMESERVER|NS|DNSZONE)$"
429 )
430 ascii_pattern = re.compile(r"^[A-Za-z0-9.\-]+$")
432 to_convert = []
433 idxs = []
435 for key, val in cmd.items():
436 if (
437 key_pattern.match(key)
438 or (
439 key.upper() == "OBJECTID"
440 and obj_class_pattern.match(cmd.get("OBJECTCLASS", ""))
441 )
442 ) and not ascii_pattern.match(val):
443 to_convert.append(val)
444 idxs.append(key)
446 if to_convert:
447 result = IDNAConverter.convert_list(to_convert)
448 pc_list = result.get_pc_list()
450 for idx, converted_value in zip(idxs, pc_list):
451 cmd[idx] = converted_value
453 return cmd