# -*- coding: utf-8 -*-
import xml.etree.ElementTree as etree
import urllib2 as urllib23
from xml.sax.saxutils import escape
import importlib
import types
from collections import OrderedDict
[docs]class XMLWrapper:
@staticmethod
[docs] def parseXML(xml_str):
return etree.fromstring(xml_str)
@staticmethod
[docs] def getRootNode(node):
return node.getroot()
@staticmethod
[docs] def getChildNode(parent_node, child_name, find_prefix=''):
return parent_node.find(find_prefix + child_name)
@staticmethod
[docs] def getAllChildNode(parent_node, child_name, find_prefix=''):
return parent_node.findall(find_prefix + child_name)
@staticmethod
[docs] def getChildNodeByAttr(parent_node, child_name, attr_name, attr_value):
return parent_node.find(child_name + '[@' + attr_name + '=\'' + attr_value + '\']')
@staticmethod
[docs] def getAllChildNodeByAttr(parent_node, child_name, attr_name, attr_value):
return parent_node.findall(child_name + '[@' + attr_name + '=\'' + attr_value + '\']')
@staticmethod
[docs] def getNodeName(node):
return node.tag
@staticmethod
[docs] def getNodeText(node):
return node.text
@staticmethod
[docs] def getNodeAttr(node, attribute_name, find_prefix=''):
return node.get(find_prefix + attribute_name)
@staticmethod
[docs] def getNodePrefix(node, prefix_name):
prefix = ''
if len(node.attrib) == 0:
return prefix
for attrib_name, value in node.attrib.items():
if attrib_name[0] == "{":
prefix, ignore, tag = attrib_name[1:].partition("}")
return "{" + prefix + "}"
return prefix
@staticmethod
[docs] def getStringFromXML(node, pretty_print=False):
return etree.tostring(node, pretty_print=pretty_print)
# map request class
[docs]class CommonAPIRequest:
def __init__(self, **kwarg):
for key, value in kwarg.items():
setattr(self, key, value)
@staticmethod
def _checkContainerValue(value):
result_value = None
if isinstance(value, list):
result_value = list()
for list_value in value:
result_value.append(CommonAPIRequest.toContainer(list_value))
elif isinstance(value, CommonAPIRequest):
result_value = CommonAPIRequest.toContainer(value)
else:
result_value = value
return result_value
@staticmethod
[docs] def toContainer(data):
if isinstance(data, dict) or isinstance(data, OrderedDict):
return data
if isinstance(data, list):
data_list = list()
for value in data:
data_list.append(CommonAPIRequest._checkContainerValue(value))
return data_list
data_dict = dict()
data_dict['__name__'] = data.__class__.__name__
for key, value in data.__dict__.items():
data_dict[key] = CommonAPIRequest._checkContainerValue(value)
return data_dict
# end map request class
[docs]class CommonResponseInfo:
def _attributeCastToType(self, data_str, cast_type_name):
default_value = 0
if cast_type_name == 'bool':
default_value = False
elif cast_type_name == 'float':
default_value = 0.0
elif cast_type_name == 'str':
default_value = ''
cast_type = eval(cast_type_name)
data = None
if data_str is not None:
data = default_value
try:
if cast_type_name == 'bool':
data = (data_str.lower() in ['true', '1', 'yes', 'on'])
else:
data = cast_type(data_str)
except ValueError, err:
pass
return data
def _isAttributeTypeDefault(self, attr_type_name):
return (attr_type_name == 'int' or attr_type_name == 'long' or
attr_type_name == 'float' or attr_type_name == 'bool' or attr_type_name == 'str')
def _is_empty_object(self, atrrib_data):
for key, value in atrrib_data.items():
if isinstance(value, list) and len(value) > 0:
return False
if value is not None:
return False
return True
def _append_object_list(self, attr_type_name, list_node, attr_type_instance, class_type, find_prefix):
if self._isAttributeTypeDefault(attr_type_name):
data_str = XMLWrapper.getNodeText(list_node)
data = self._attributeCastToType(data_str, attr_type_name)
else:
if attr_type_instance == object:
data = class_type(list_node, find_prefix)
else:
data = attr_type_instance(list_node, find_prefix)
if hasattr(data, "is_empty_object"):
return None
else:
return data
def _parseAttributesData(self, class_type, xml_object, find_prefix):
attrib_data_dict = dict()
empty_object_size = len(self.__dict__)
for name, attr_type in self.__dict__.items():
if not isinstance(attr_type, (types.TypeType, types.ClassType)) and not isinstance(attr_type, dict):
continue
if not isinstance(attr_type, dict):
data = None
attr_type_name = attr_type.__name__;
if self._isAttributeTypeDefault(attr_type_name):
data_str = XMLWrapper.getNodeAttr(xml_object, name)
if data_str is None:
child_attribute = XMLWrapper.getChildNode(xml_object, name)
if child_attribute is not None:
data_str = XMLWrapper.getNodeText(child_attribute)
data = self._attributeCastToType(data_str, attr_type_name)
else:
child_node = XMLWrapper.getChildNode(xml_object, name)
if child_node is None:
continue
child_type = XMLWrapper.getNodeAttr(child_node, 'type', find_prefix)
if child_type is None:
data = attr_type(child_node, find_prefix)
else:
data = child_type(child_node, find_prefix)
attrib_data_dict[name] = data
else:
child_node = XMLWrapper.getChildNode(xml_object, name)
data_list = list()
attr_type_instance = attr_type['list']
attr_type_name = attr_type_instance.__name__;
if child_node is not None:
child_count = 0
for list_node in child_node:
data_object = self._append_object_list(attr_type_name, list_node, attr_type_instance,
class_type, find_prefix)
if data_object is not None:
data_list.append(data_object)
child_count += 1
# I think its a logical bug, but ...
if child_count == 0:
for list_node in xml_object:
if XMLWrapper.getNodeName(list_node) == name:
data_object = self._append_object_list(attr_type_name, list_node, attr_type_instance,
class_type, find_prefix)
if data_object is not None:
data_list.append(data_object)
attrib_data_dict[name] = data_list
if not self._is_empty_object(attrib_data_dict):
for key, value in attrib_data_dict.items():
setattr(self, key, value)
elif len(self.__dict__) == empty_object_size:
setattr(self, "is_empty_object", True)
def __init__(self, xml_object, find_prefix):
self._parseAttributesData(self.__class__, xml_object, find_prefix)
[docs]class CommonApiResult:
def __init__(self, xml_object):
error_node = XMLWrapper.getChildNode(xml_object, 'Error')
self.error = None if error_node is None else XMLWrapper.getNodeText(error_node)
error_code_node = XMLWrapper.getChildNode(xml_object, 'ErrorCode')
self.error_code = None if error_code_node is None else XMLWrapper.getNodeText(error_code_node)
self.response_info = None
response_info_node = XMLWrapper.getChildNode(xml_object, 'ResponseInfo')
if response_info_node is not None:
find_prefix = XMLWrapper.getNodePrefix(response_info_node, 'xsi')
type_attr = XMLWrapper.getNodeAttr(response_info_node, find_prefix + 'type')
if not type_attr is None:
response_class = CommonApiResult.importAPIClass(type_attr)
if response_class is not None:
self.response_info = response_class(response_info_node, find_prefix)
success = XMLWrapper.getNodeAttr(xml_object, 'Success')
success = success.lower()
self.success = success in ['true', 'yes', 'on']
@staticmethod
[docs] def importAPIClass(name):
module = importlib.import_module('cloudshell.api.cloudshell_api')
if hasattr(module, name):
return getattr(module, name)
return None
[docs]class CloudShellAPIError(Exception):
def __init__(self, code, message, rawxml):
self.code = code
self.message = message
self.rawxml = rawxml
def __str__(self):
return 'CloudShell API error ' + str(self.code) + ': ' + self.message
def __repr__(self):
return 'CloudShell API error ' + str(self.code) + ': ' + self.message
[docs]class CommonAPISession:
def __init__(self, host, username, password, domain):
self.host = host
self.username = username
self.password = password
self.domain = domain
def _parseXML(self, xml_str):
return etree.fromstring(xml_str)
def _replaceSendValue(self, data):
"""Normalize xml string, escape special xml characters
"""
if data is None: return u''
try:
data_str = unicode(data)
except:
data_str = unicode(data.decode("utf-8"))
data_str = u"".join([escape(char) for char in data_str])
if data_str == 'True' or data_str == 'False':
return data_str.lower()
else:
return data_str
def _to_unicode_string(self, data):
if data is None:
return u''
try:
return unicode(data)
except:
return unicode(data.decode("utf-8"))
def _encodeHeaders(self):
self.headers = dict((key.encode('ascii') if isinstance(key, unicode) else key,
value.encode('ascii') if isinstance(value, unicode) else value)
for key, value in self.headers.items())
def _sendRequest(self, operation, message, request_headers):
"""
Sending http POST request through URLLIB package
:param operation: operation name
:param message: request body
:param request_headers: header of the request
:return: responce string data
"""
operation_url = str(self.url + operation)
request_object = urllib23.Request(operation_url, message.encode('utf-8'), request_headers)
response = urllib23.urlopen(request_object)
return response.read()
# THE METHOD TO DELETE
def _serializeRequestData(self, object_data, prev_type=None):
"""Deprecated method which generated xml request using string concatenation, replaced with _new_serializeRequestData
"""
request_str = ''
if isinstance(object_data, dict):
if len(object_data) == 0:
return request_str
if '__name__' not in object_data:
raise Exception('CloudShell API', "Object data doesn't have '__name__' attribute!")
request_str += '<' + object_data['__name__'] + '>\n'
for key, value in object_data.items():
if value is None or key == '__name__':
continue
request_str += '<' + key + '>' + self._serializeRequestData(value) + '</' + key + '>\n'
request_str += '</' + object_data['__name__'] + '>\n'
elif isinstance(object_data, list):
request_str += '\n'
for value in object_data:
request_str += self._serializeRequestData(value, list())
elif isinstance(object_data, basestring) or isinstance(object_data, int) or isinstance(object_data, float):
if prev_type is not None and isinstance(prev_type, list):
request_str += '<string>' + self._replaceSendValue(str(object_data)) + '</string>\n'
else:
request_str += self._replaceSendValue(str(object_data))
return request_str
def _new_serializeRequestData(self, root_node, object_data, prev_type=None):
"""Generate xml from received request data using etree.xml
"""
if isinstance(object_data, dict):
if '__name__' in object_data:
working_node = etree.SubElement(root_node, object_data.pop('__name__'))
else:
working_node = root_node
for key, value in object_data.items():
if value is None:
continue
if isinstance(value, basestring):
new_node = etree.SubElement(working_node, key)
new_node.text = value
else:
child_node = working_node
if (isinstance(value, list)):
child_node = etree.SubElement(working_node, key)
serialized_node = self._new_serializeRequestData(child_node, value)
return root_node
elif isinstance(object_data, list):
for value in object_data:
serialized_node = self._new_serializeRequestData(root_node, value, list())
elif isinstance(object_data, basestring) or isinstance(object_data, int) or isinstance(object_data, float):
if prev_type is not None and isinstance(prev_type, list):
child_node = etree.SubElement(root_node, 'string')
child_node.text = object_data
elif isinstance(object_data, bool):
root_node.text = str(object_data).lower()
else:
root_node.text = self._to_unicode_string(object_data)
return root_node
[docs] def generateAPIRequest(self, kwargs):
"""
Generic method for generation and sending XML requests
:param return_type: type of returning data
:param kwargs: map of the parameters that need to be send to the server
:return: string data or API object
"""
if 'method_name' not in kwargs:
raise CloudShellAPIError(404, 'Key "method_name" not in input data!', '')
method_name = kwargs.pop('method_name', None)
request_node = etree.Element(method_name)
#request_str = '<' + method_name + '>\n'
for name in kwargs:
child_node = etree.SubElement(request_node, name)
self._new_serializeRequestData(child_node, kwargs[name])
response_str = self._sendRequest(self.username, self.domain, method_name, etree.tostring(request_node))
response_str = response_str.replace('xmlns="http://schemas.qualisystems.com/ResourceManagement/ApiCommandResult.xsd"', '') \
.replace('�', '<NUL>')
response_xml = XMLWrapper.parseXML(response_str)
api_result = CommonApiResult(response_xml)
if not api_result.success:
raise CloudShellAPIError(api_result.error_code, api_result.error, response_str)
if api_result.response_info is None:
return response_str
else:
return api_result.response_info
def __prettify_xml(self, elem):
"""Return a pretty-printed XML string for the Element.
"""
from xml.dom.minidom import parse, parseString
rough_string = etree.tostring(elem, 'utf-8')
reparsed = parseString(rough_string)
return reparsed.toprettyxml(indent="\t")