Source code for punx.schema_manager

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#-----------------------------------------------------------------------------
# :author:    Pete R. Jemian
# :email:     prjemian@gmail.com
# :copyright: (c) 2016-2017, Pete R. Jemian
#
# Distributed under the terms of the Creative Commons Attribution 4.0 International Public License.
#
# The full license is in the file LICENSE.txt, distributed with this software.
#-----------------------------------------------------------------------------

"""
manages the XML Schema of this project

The *schema_manager* calls the *cache_manager* and
is called by *nxdl_manager*.

Public

.. autosummary::
   
   ~SchemaManager
   ~Schema_Root
   ~Schema_Attribute 
   ~Schema_Element 
   ~Schema_Type 
   ~get_default_schema_manager
   ~raise_error
   ~strip_ns

Internal

.. autosummary::
   
   ~_Mixin
   ~_GroupParsing
   ~_Recursion

"""


from __future__ import print_function

import lxml.etree
import os
from . import NAMESPACE_DICT, FileNotFound, InvalidNxdlFile
from . import singletons
from . import utils


logger = utils.setup_logger(__name__)


[docs]def strip_ns(ref): """ strip the namespace prefix from ``ref`` :param str ref: one word, colon delimited string, such as *nx:groupGroup* :returns str: the part to the right of the last colon """ return ref.split(':')[-1]
[docs]def raise_error(node, text, obj): """ standard *ValueError* exception handling :param obj node: instance of :param str text: label for ``obj`` :param str obj: value """ msg = 'line ' + str(node.sourceline) msg += ': ' + text + str(obj) raise ValueError(msg)
[docs]def get_default_schema_manager(): """ internal: convenience function """ from punx import cache_manager cm = cache_manager.CacheManager() assert(cm is not None) assert(cm.default_file_set is not None) return cm.default_file_set.schema_manager
[docs]class SchemaManager(object): """ describes the XML Schema for the NeXus NXDL definitions files """ ns = NAMESPACE_DICT def __init__(self, path=None): from punx import cache_manager if path is None: cm = cache_manager.CacheManager() if cm is None or cm.default_file_set is None: raise ValueError('Could not get NXDL file set from the cache') path = cm.default_file_set.path schema_file = os.path.join(path, 'nxdl.xsd') if not os.path.exists(schema_file): raise FileNotFound(schema_file) self.schema_file = schema_file if not os.path.exists(self.schema_file): raise FileNotFound('XML Schema file: ' + self.schema_file) self.lxml_tree = lxml.etree.parse(self.schema_file) self.lxml_schema = lxml.etree.XMLSchema(self.lxml_tree) self.lxml_root = self.lxml_tree.getroot() nodes = self.lxml_root.xpath('xs:element', namespaces=self.ns) if len(nodes) != 1: raise InvalidNxdlFile(self.schema_file) self.nxdl = Schema_Root( nodes[0], ns_dict=self.ns, schema_root=self.lxml_root, schema_manager=self) # cleanup these internal structures del self.lxml_root #del self.lxml_schema # needed for XML file validation del self.lxml_tree
[docs] def parse_nxdl_patterns(self): """ get regexp patterns for validItemName, validNXClassName, & validTargetName from nxdl.xsd """ db = {} for node in self.lxml_root.xpath('xs:simpleType', namespaces=self.ns): key = node.attrib['name'] if key.startswith('valid'): obj = Schema_pattern() obj.pattern_name = key db[key] = obj subnodes = node.xpath('xs:restriction', namespaces=self.ns) assert(len(subnodes) == 1) obj.base = strip_ns(subnodes[0].attrib['base']) for item in subnodes[0]: if isinstance(item, lxml.etree._Comment): pass elif item.tag.endswith('}pattern'): obj.re_list.append(item.attrib['value']) elif item.tag.endswith('}maxLength'): obj.maxLength = int(item.attrib['value']) # adjust for any restrictions with NeXus base for v in db.values(): if v.base != 'token' and v.base in db: base = db[v.base] v.base = base.base v.maxLength = base.maxLength v.re_list += base.re_list return db
[docs] def parse_nxdlTypes(self): """ get the allowed data types and unit types from nxdlTypes.xsd """ if os.path.exists(self.schema_file): path = os.path.dirname(self.schema_file) else: from punx import cache_manager cm = cache_manager.CacheManager() if cm is None or cm.default_file_set is None: raise ValueError('Could not get NXDL file set from the cache') path = cm.default_file_set.path self.types_file = os.path.join(path, 'nxdlTypes.xsd') if not os.path.exists(self.types_file): raise FileNotFound(self.types_file) lxml_types_tree = lxml.etree.parse(self.types_file) db = {} root = lxml_types_tree.getroot() for node in root: if isinstance(node, lxml.etree._Comment): pass elif node.tag.endswith('}annotation'): pass else: obj = Schema_nxdlType(node, ns_dict=self.ns, schema_root=root) if obj.name is not None: db[obj.name] = obj # re-arrange units = list(db['anyUnitsAttr'].values or []) del db['anyUnitsAttr'] del db['primitiveType'] return db, units
[docs]class Schema_pattern(object): """ describe the regular expression patterns ofr names of NeXus things """ def __init__(self): self.base = 'token' self.pattern_name = None self.re_list = [] self.maxLength = -1 # unlimited
[docs]class Schema_nxdlType(object): """ one of the types defined in the file *nxdlTypes.xsd* """ def __init__(self, xml_obj, ns_dict=None, schema_root=None): self.name = xml_obj.attrib.get('name') self.restriction = None self.union = None self.values = None self.schema_root = schema_root self.attrs = {} for node in xml_obj: if isinstance(node, lxml.etree._Comment): pass elif node.tag.endswith('}annotation'): pass elif node.tag.endswith('}list'): self.values = map(strip_ns, [node.attrib['itemType'],]) elif node.tag.endswith('}restriction'): self.restriction = strip_ns(node.attrib['base']) self.values = [] for subnode in node: if isinstance(subnode, lxml.etree._Comment): pass elif subnode.tag.endswith('}enumeration'): self.values.append(subnode.attrib['value']) elif node.tag.endswith('}union'): self.union = map(strip_ns, node.attrib['memberTypes'].split()) else: raise_error(node, 'unhandled tag=', node.tag)
class _Mixin(object): """ common code for NXDL Rules classes below :param lxml.etree.Element xml_obj: XML element :param str obj_name: optional, default taken from ``xml_obj`` :param dict ns_dict: optional, default taken from :data:`__init__.NAMESPACE_DICT` :param obj schema_root: optional, instance of lxml.etree._Element """ def __init__(self, xml_obj, obj_name=None, ns_dict=None, schema_root=None): self.name = obj_name or xml_obj.attrib.get('name') self.ns = ns_dict or NAMESPACE_DICT self.lxml_root = schema_root def get_named_node(self, tag, attribute, value): """ return a named node from the XML Schema :param str tag: XML Schema tag (such as "complexType") to match :param str attribute: attribute name to match :param str value: attribute value to match """ if self.lxml_root is None: raise ValueError root = self.lxml_root xpath_str = 'xs:' + tag xpath_str += '[@' + attribute xpath_str += '="' + value + '"]' node_list = root.xpath(xpath_str, namespaces=self.ns) if len(node_list) != 1: msg = 'wrong number of ' + tag msg += ' nodes found: ' + str(len(node_list)) raise ValueError(msg) return node_list[0] def copy_to(self, target): """ copy results into target object :param obj target: instance of _Mixin, such as Schema_Element """ for k, v in self.attrs.items(): target.attrs[k] = v for k, v in self.children.items(): target.children[k] = v def parse_attribute(self, node): """ """ obj = Schema_Attribute(node, schema_root=self.lxml_root) self.attrs[obj.name] = obj def parse_attributeGroup(self, node): """ """ obj = Schema_Type(node.attrib.get('ref'), schema_root=self.lxml_root) obj.copy_to(self) def parse_complexContent(self, node): """ """ for subnode in node: if subnode.tag.endswith('}extension'): ref = subnode.attrib.get('base') if ref not in ('nx:basicComponent'): raise_error(subnode, 'unexpected base=', ref) obj = Schema_Type(ref, schema_root=self.lxml_root) obj.copy_to(self) # parse children of extension node for obj_node in subnode: if obj_node.tag.endswith('}annotation'): pass elif obj_node.tag.endswith('}attribute'): self.parse_attribute(obj_node) elif obj_node.tag.endswith('}sequence'): self.parse_sequence(obj_node) else: raise_error(obj_node, 'unexpected base=', obj_node.tag) else: raise_error(subnode, 'unexpected tag=', subnode.tag) def parse_group(self, node): """ """ obj = Schema_Type(node.attrib.get('ref'), schema_root=self.lxml_root) obj.copy_to(self)
[docs]class Schema_Root(_Mixin): """ root element of the nxdl.xsd file :param lxml.etree.Element xml_obj: XML element :param str obj_name: optional, default taken from ``xml_obj`` :param dict ns_dict: optional, default taken from :data:`NAMESPACE_DICT` :param obj schema_root: optional, instance of lxml.etree._Element """ attrs = {} children = {} patterns = None type = None units = None def __init__(self, element_node, obj_name=None, ns_dict=None, schema_root=None, schema_manager=None): _Mixin.__init__( self, element_node, obj_name=obj_name, ns_dict=ns_dict, schema_root=schema_root) self.schema_manager = schema_manager element_type = element_node.attrib.get('type') if element_type is None: element_name = element_node.attrib.get('name') raise_error(element_node, 'no @type for element node: ', element_name) ref = strip_ns(element_type) type_node = self.get_named_node('complexType', 'name', ref) for node in type_node: if node.tag.endswith('}attribute'): obj = Schema_Attribute(node, schema_root=self.lxml_root) self.attrs[obj.name] = obj elif node.tag.endswith('}attributeGroup'): self.parse_attributeGroup(node) elif node.tag.endswith('}sequence'): self.parse_sequence(node) elif node.tag.endswith('}annotation'): pass else: raise_error(node, 'unhandled tag=', node.tag) if schema_manager is not None: self.types, self.units = schema_manager.parse_nxdlTypes() self.patterns = schema_manager.parse_nxdl_patterns() self.schema_types = dict(definition=self) # FIXME: self.schema_types.update(self.children)
[docs] def parse_sequence(self, seq_node): """ parse the sequence used in the root element """ for node in seq_node: if node.tag.endswith('}element'): obj = Schema_Element(node, schema_root=self.lxml_root) self.children[obj.name] = obj elif node.tag.endswith('}group'): obj = Schema_Type(node.attrib.get('ref'), schema_root=self.lxml_root) obj.copy_to(self) else: msg = 'unhandled tag in ``definitionType``: ' raise_error(node, msg, node.tag)
[docs]class Schema_Attribute(_Mixin): """ xs:attribute element :param lxml.etree.Element xml_obj: XML element :param str obj_name: optional, default taken from ``xml_obj`` :param dict ns_dict: optional, default taken from :data:`NAMESPACE_DICT` :param obj schema_root: optional, instance of lxml.etree._Element """ def __init__(self, xml_obj, obj_name=None, ns_dict=None, schema_root=None): assert(xml_obj is not None) assert(xml_obj.tag == '{'+xml_obj.nsmap['xs']+'}attribute') _Mixin.__init__( self, xml_obj, obj_name=obj_name, ns_dict=ns_dict, schema_root=schema_root) use = xml_obj.attrib.get('use', 'optional') self.required = use in ('required', ) self.type = xml_obj.attrib.get('type', 'str') defalt = xml_obj.attrib.get('default') if self.type in ('nx:NX_BOOLEAN',): self.default_value = defalt.lower() in ('true', 'y', 1) else: self.default_value = defalt self.enum = [] xpath_str = 'xs:simpleType/xs:restriction/xs:enumeration' for node in xml_obj.xpath(xpath_str, namespaces=self.ns): v = node.attrib.get('value') if v is not None: self.enum.append(v) self.patterns = [] xpath_str = 'xs:simpleType/xs:restriction/xs:pattern' for node in xml_obj.xpath(xpath_str, namespaces=self.ns): v = node.attrib.get('value') if v is not None: self.patterns.append(v) def __str__(self, *args, **kwargs): try: s = '@' + self.name s += ' : ' + self.type if len(self.enum): s += ' = ' s += ' | '.join(self.enum) return s except Exception: return _Mixin.__str__(self, *args, **kwargs)
[docs]class Schema_Element(_Mixin): """ xs:element :param lxml.etree.Element xml_obj: XML element :param str obj_name: optional, default taken from ``xml_obj`` :param dict ns_dict: optional, default taken from :data:`NAMESPACE_DICT` :param obj schema_root: optional, instance of lxml.etree._Element :see: http://download.nexusformat.org/doc/html/nxdl.html :see: http://download.nexusformat.org/doc/html/nxdl_desc.html#nxdl-elements """ def __init__(self, xml_obj, obj_name=None, ns_dict=None, schema_root=None): _Mixin.__init__( self, xml_obj, obj_name=obj_name, ns_dict=ns_dict, schema_root=schema_root) self.children = {} self.attrs = {} # read & analyze theNXDL structural *type* referenced by *ref* ref = self.type = xml_obj.attrib.get('type') if ref is None: for node in xml_obj: if node.tag.endswith('}complexType'): a = Schema_Attribute(node.find('xs:attribute', self.ns), schema_root=self.lxml_root) self.attrs[a.name] = a elif node.tag.endswith('}annotation'): pass else: raise_error(node, 'unhandled tag=', node.tag) else: # avoid known infinite recursion: group may contain group(s) ok_to_parse = True if xml_obj.attrib['name'] == 'group' and xml_obj.attrib['type'] == 'nx:groupType': if _GroupParsing().started: ok_to_parse = False # needs a special code to apply this rule # isinstance(obj, _Recursion) self.children['group'] = _Recursion('group') _GroupParsing().started = True if ok_to_parse: type_obj = Schema_Type(ref, schema_root=self.lxml_root) type_obj.copy_to(self)
[docs]class Schema_Type(_Mixin): """ a named NXDL structure type (such as groupGroup) :param str ref: name of NXDL structure type (such as ``groupGroup``) :param str tag: XML Schema element tag, such as complexType (default=``*``) :param obj schema_root: optional, instance of lxml.etree._Element :see: http://download.nexusformat.org/doc/html/nxdl.html :see: http://download.nexusformat.org/doc/html/nxdl_desc.html#nxdl-data-types-internal """ def __init__(self, ref, tag = '*', schema_root=None): # _Mixin.__init__(self, xml_obj) # do the _Mixin.__init__ directly here self.ns = NAMESPACE_DICT self.lxml_root = schema_root xml_obj = self.get_named_node(tag, 'name', strip_ns(ref)) self.name = xml_obj.attrib.get('name') self.attrs = {} self.children = {} for node in xml_obj: if isinstance(node, lxml.etree._Comment): pass elif node.tag.endswith('}annotation'): pass elif node.tag.endswith('}attribute'): self.parse_attribute(node) elif node.tag.endswith('}attributeGroup'): self.parse_attributeGroup(node) elif node.tag.endswith('}complexContent'): self.parse_complexContent(node) elif node.tag.endswith('}group'): self.parse_group(node) elif node.tag.endswith('}sequence'): self.parse_sequence(node) else: raise_error(node, 'unexpected tag=', node.tag)
[docs] def parse_sequence(self, node): """ """ for subnode in node: if subnode.tag.endswith('}element'): obj = Schema_Element(subnode, schema_root=self.lxml_root) self.children[obj.name] = obj elif subnode.tag.endswith('}group'): obj = Schema_Element(subnode, schema_root=self.lxml_root) self.children[obj.name] = obj elif subnode.tag.endswith('}any'): # do not process this one, only used for documentation pass else: raise_error(subnode, 'unexpected tag=', subnode.tag)
class _GroupParsing(singletons.Singleton): """ internal: avoid a known recursion of group in a group """ started = False class _Recursion(_Mixin): """ internal: an element used in recursion, such as child group of group :param str obj_name: optional, default taken from ``xml_obj`` """ def __init__(self, obj_name): _Mixin.__init__(self, None, obj_name=obj_name, ns_dict=None) # if __name__ == '__main__': # sm = SchemaManager() # _breakpoint = True