Source code for rinse.util

"""rinse SOAP client utility functions."""
from __future__ import print_function
import collections
import os.path
import pprint
import textwrap

import defusedxml.lxml
import lxml.builder
from lxml import etree

RINSE_DIR = os.path.dirname(__file__)
ENVELOPE_XSD = 'soap-1.1_envelope.xsd'

NS_SOAPENV = 'http://schemas.xmlsoap.org/soap/envelope/'

NS_MAP = {
    'soapenv': NS_SOAPENV,
}


[docs]def element_as_tree(element): """Convert an element from within an ElementTree to its own tree.""" # XXX: this is a crude hack, but it works - got any better ideas? return safe_parse_string( etree.tostring( etree.ElementTree(element), ), )
[docs]def safe_parse_string(raw_xml, **kwargs): """Safely parse raw XML content into an element tree.""" return defusedxml.lxml.fromstring(raw_xml, **kwargs)
[docs]def safe_parse_path(xml_path, **kwargs): """Safely parse XML content from path into an element tree.""" return defusedxml.lxml.parse(xml_path, **kwargs)
[docs]def safe_parse_url(xml_url, **kwargs): """Safely parse XML content from path into an element tree.""" return defusedxml.lxml.parse(xml_url, **kwargs)
[docs]class ElementMaker(lxml.builder.ElementMaker): """Wrapper around lxml ElementMaker that casts ints as strings.""" def __getattr__(self, name): """Return a lambda that parses int args as strings.""" _cls = super(ElementMaker, self).__getattr__(name) def __cls_wraper(*args, **kwargs): """Wrapper around Element class.""" return _cls( *[ str(arg) if isinstance(arg, int) else arg for arg in args ], **kwargs ) return __cls_wraper
RinseResponse = collections.namedtuple('RinseResponse', ['response', 'doc'])
[docs]class SchemaCache(collections.defaultdict): """Cache of lxml.etree.XMLSchema instances, keyed by XSD basename."""
[docs] def get(self, xsd, xpath=None, namespaces=None): """Generate XMLSchema instances as specified.""" if xsd.startswith('/'): pass # absolute path elif ':' in xsd: pass # URL - defused should help protect us. else: # assume XSD is in res/ subdir of rinse project. xsd = os.path.join(RINSE_DIR, 'res', xsd) doc = safe_parse_path(xsd) if xpath: doc = doc.xpath(xpath, namespaces=namespaces)[0] self[xsd] = schema = etree.XMLSchema(doc) return schema
def __missing__(self, xsd): """Generate XMLSchema instances on demand.""" return self.get(xsd)
SCHEMA = SchemaCache()
[docs]class ElementMakerCache(collections.defaultdict): """Cache of ElementMaker instances for the given nsmap.""" def __init__(self, nsmap): """Keep reference to the nsmap we're given.""" super(ElementMakerCache, self).__init__() self.nsmap = { name: url for name, url in list(NS_MAP.items()) + list(nsmap.items()) } def __missing__(self, ns_prefix): """Generate ElementMaker instances as required.""" return ElementMaker( namespace=self.nsmap[ns_prefix], nsmap=self.nsmap, )
PRETTY_PARSER = etree.XMLParser( remove_blank_text=True, ) PRETTY_TEXT_WRAPPER = textwrap.TextWrapper( width=78, initial_indent='', subsequent_indent=' ', replace_whitespace=False, drop_whitespace=False, break_long_words=False, break_on_hyphens=False, )
[docs]def printxml(doc): """Pretty print an lxml document tree. The XML printed may not be exactly equivalent to the doc provided, as blank text within elements will be stripped to allow etree.tostring() to work with the 'pretty_print' option set. """ pretty_tree = safe_parse_string( etree.tostring(doc), parser=PRETTY_PARSER, ) pretty_xml = etree.tostring( pretty_tree, pretty_print=True, encoding='unicode', ).replace('\t', ' ').rstrip('\n') for line in pretty_xml.split('\n'): line = PRETTY_TEXT_WRAPPER.fill(line.rstrip('\n')).rstrip('\n') for subline in line.split('\n'): if not subline.strip(): continue print(subline)
[docs]def recursive_dict(element): """Map an XML tree into a dict of dicts.""" if isinstance(element, (tuple, list)): return tuple( recursive_dict(child) for child in element ) return ( '{}{}'.format( element.tag, pprint.pformat(element.attrib, compact=True, width=10000), ), dict( map(recursive_dict, element) ) or element.text )
[docs]class cached_property(object): def __init__(self, func): self.func = func self.__doc__ = getattr(func, '__doc__') def __get__(self, instance, owner): if instance is None: return self result = instance.__dict__[self.func.__name__] = self.func(instance) return result