#!/usr/bin/env python
# -*- coding: utf-8 -*-
#-----------------------------------------------------------------------------
# :author: Pete R. Jemian
# :email: prjemian@gmail.com
# :copyright: (c) 2016-2017, Pete R. Jemian
#
# Distributed under the terms of the Creative Commons Attribution 4.0 International Public License.
#
# The full license is in the file LICENSE.txt, distributed with this software.
#-----------------------------------------------------------------------------
"""
manages the XML Schema of this project
The *schema_manager* calls the *cache_manager* and
is called by *nxdl_manager*.
Public
.. autosummary::
~SchemaManager
~Schema_Root
~Schema_Attribute
~Schema_Element
~Schema_Type
~get_default_schema_manager
~raise_error
~strip_ns
Internal
.. autosummary::
~_Mixin
~_GroupParsing
~_Recursion
"""
from __future__ import print_function
import lxml.etree
import os
from . import NAMESPACE_DICT, FileNotFound, InvalidNxdlFile
from . import singletons
from . import utils
logger = utils.setup_logger(__name__)
[docs]def strip_ns(ref):
"""
strip the namespace prefix from ``ref``
:param str ref: one word, colon delimited string, such as *nx:groupGroup*
:returns str: the part to the right of the last colon
"""
return ref.split(':')[-1]
[docs]def raise_error(node, text, obj):
"""
standard *ValueError* exception handling
:param obj node: instance of
:param str text: label for ``obj``
:param str obj: value
"""
msg = 'line ' + str(node.sourceline)
msg += ': ' + text + str(obj)
raise ValueError(msg)
[docs]def get_default_schema_manager():
"""
internal: convenience function
"""
from punx import cache_manager
cm = cache_manager.CacheManager()
assert(cm is not None)
assert(cm.default_file_set is not None)
return cm.default_file_set.schema_manager
[docs]class SchemaManager(object):
"""
describes the XML Schema for the NeXus NXDL definitions files
"""
ns = NAMESPACE_DICT
def __init__(self, path=None):
from punx import cache_manager
if path is None:
cm = cache_manager.CacheManager()
if cm is None or cm.default_file_set is None:
raise ValueError('Could not get NXDL file set from the cache')
path = cm.default_file_set.path
schema_file = os.path.join(path, 'nxdl.xsd')
if not os.path.exists(schema_file):
raise FileNotFound(schema_file)
self.schema_file = schema_file
if not os.path.exists(self.schema_file):
raise FileNotFound('XML Schema file: ' + self.schema_file)
self.lxml_tree = lxml.etree.parse(self.schema_file)
self.lxml_schema = lxml.etree.XMLSchema(self.lxml_tree)
self.lxml_root = self.lxml_tree.getroot()
nodes = self.lxml_root.xpath('xs:element', namespaces=self.ns)
if len(nodes) != 1:
raise InvalidNxdlFile(self.schema_file)
self.nxdl = Schema_Root(
nodes[0],
ns_dict=self.ns,
schema_root=self.lxml_root,
schema_manager=self)
# cleanup these internal structures
del self.lxml_root
#del self.lxml_schema # needed for XML file validation
del self.lxml_tree
[docs] def parse_nxdl_patterns(self):
"""
get regexp patterns for validItemName, validNXClassName, & validTargetName from nxdl.xsd
"""
db = {}
for node in self.lxml_root.xpath('xs:simpleType', namespaces=self.ns):
key = node.attrib['name']
if key.startswith('valid'):
obj = Schema_pattern()
obj.pattern_name = key
db[key] = obj
subnodes = node.xpath('xs:restriction', namespaces=self.ns)
assert(len(subnodes) == 1)
obj.base = strip_ns(subnodes[0].attrib['base'])
for item in subnodes[0]:
if isinstance(item, lxml.etree._Comment):
pass
elif item.tag.endswith('}pattern'):
obj.re_list.append(item.attrib['value'])
elif item.tag.endswith('}maxLength'):
obj.maxLength = int(item.attrib['value'])
# adjust for any restrictions with NeXus base
for v in db.values():
if v.base != 'token' and v.base in db:
base = db[v.base]
v.base = base.base
v.maxLength = base.maxLength
v.re_list += base.re_list
return db
[docs] def parse_nxdlTypes(self):
"""
get the allowed data types and unit types from nxdlTypes.xsd
"""
if os.path.exists(self.schema_file):
path = os.path.dirname(self.schema_file)
else:
from punx import cache_manager
cm = cache_manager.CacheManager()
if cm is None or cm.default_file_set is None:
raise ValueError('Could not get NXDL file set from the cache')
path = cm.default_file_set.path
self.types_file = os.path.join(path, 'nxdlTypes.xsd')
if not os.path.exists(self.types_file):
raise FileNotFound(self.types_file)
lxml_types_tree = lxml.etree.parse(self.types_file)
db = {}
root = lxml_types_tree.getroot()
for node in root:
if isinstance(node, lxml.etree._Comment):
pass
elif node.tag.endswith('}annotation'):
pass
else:
obj = Schema_nxdlType(node, ns_dict=self.ns, schema_root=root)
if obj.name is not None:
db[obj.name] = obj
# re-arrange
units = list(db['anyUnitsAttr'].values or [])
del db['anyUnitsAttr']
del db['primitiveType']
return db, units
[docs]class Schema_pattern(object):
"""
describe the regular expression patterns ofr names of NeXus things
"""
def __init__(self):
self.base = 'token'
self.pattern_name = None
self.re_list = []
self.maxLength = -1 # unlimited
[docs]class Schema_nxdlType(object):
"""
one of the types defined in the file *nxdlTypes.xsd*
"""
def __init__(self, xml_obj, ns_dict=None, schema_root=None):
self.name = xml_obj.attrib.get('name')
self.restriction = None
self.union = None
self.values = None
self.schema_root = schema_root
self.attrs = {}
for node in xml_obj:
if isinstance(node, lxml.etree._Comment):
pass
elif node.tag.endswith('}annotation'):
pass
elif node.tag.endswith('}list'):
self.values = map(strip_ns, [node.attrib['itemType'],])
elif node.tag.endswith('}restriction'):
self.restriction = strip_ns(node.attrib['base'])
self.values = []
for subnode in node:
if isinstance(subnode, lxml.etree._Comment):
pass
elif subnode.tag.endswith('}enumeration'):
self.values.append(subnode.attrib['value'])
elif node.tag.endswith('}union'):
self.union = map(strip_ns, node.attrib['memberTypes'].split())
else:
raise_error(node, 'unhandled tag=', node.tag)
class _Mixin(object):
"""
common code for NXDL Rules classes below
:param lxml.etree.Element xml_obj: XML element
:param str obj_name: optional, default taken from ``xml_obj``
:param dict ns_dict: optional, default taken from :data:`__init__.NAMESPACE_DICT`
:param obj schema_root: optional, instance of lxml.etree._Element
"""
def __init__(self, xml_obj, obj_name=None, ns_dict=None, schema_root=None):
self.name = obj_name or xml_obj.attrib.get('name')
self.ns = ns_dict or NAMESPACE_DICT
self.lxml_root = schema_root
def get_named_node(self, tag, attribute, value):
"""
return a named node from the XML Schema
:param str tag: XML Schema tag (such as "complexType") to match
:param str attribute: attribute name to match
:param str value: attribute value to match
"""
if self.lxml_root is None:
raise ValueError
root = self.lxml_root
xpath_str = 'xs:' + tag
xpath_str += '[@' + attribute
xpath_str += '="' + value + '"]'
node_list = root.xpath(xpath_str, namespaces=self.ns)
if len(node_list) != 1:
msg = 'wrong number of ' + tag
msg += ' nodes found: ' + str(len(node_list))
raise ValueError(msg)
return node_list[0]
def copy_to(self, target):
"""
copy results into target object
:param obj target: instance of _Mixin, such as Schema_Element
"""
for k, v in self.attrs.items():
target.attrs[k] = v
for k, v in self.children.items():
target.children[k] = v
def parse_attribute(self, node):
""" """
obj = Schema_Attribute(node, schema_root=self.lxml_root)
self.attrs[obj.name] = obj
def parse_attributeGroup(self, node):
""" """
obj = Schema_Type(node.attrib.get('ref'), schema_root=self.lxml_root)
obj.copy_to(self)
def parse_complexContent(self, node):
""" """
for subnode in node:
if subnode.tag.endswith('}extension'):
ref = subnode.attrib.get('base')
if ref not in ('nx:basicComponent'):
raise_error(subnode, 'unexpected base=', ref)
obj = Schema_Type(ref, schema_root=self.lxml_root)
obj.copy_to(self)
# parse children of extension node
for obj_node in subnode:
if obj_node.tag.endswith('}annotation'):
pass
elif obj_node.tag.endswith('}attribute'):
self.parse_attribute(obj_node)
elif obj_node.tag.endswith('}sequence'):
self.parse_sequence(obj_node)
else:
raise_error(obj_node, 'unexpected base=', obj_node.tag)
else:
raise_error(subnode, 'unexpected tag=', subnode.tag)
def parse_group(self, node):
""" """
obj = Schema_Type(node.attrib.get('ref'), schema_root=self.lxml_root)
obj.copy_to(self)
[docs]class Schema_Root(_Mixin):
"""
root element of the nxdl.xsd file
:param lxml.etree.Element xml_obj: XML element
:param str obj_name: optional, default taken from ``xml_obj``
:param dict ns_dict: optional, default taken from :data:`NAMESPACE_DICT`
:param obj schema_root: optional, instance of lxml.etree._Element
"""
attrs = {}
children = {}
patterns = None
type = None
units = None
def __init__(self, element_node, obj_name=None, ns_dict=None, schema_root=None, schema_manager=None):
_Mixin.__init__(
self,
element_node,
obj_name=obj_name,
ns_dict=ns_dict,
schema_root=schema_root)
self.schema_manager = schema_manager
element_type = element_node.attrib.get('type')
if element_type is None:
element_name = element_node.attrib.get('name')
raise_error(element_node, 'no @type for element node: ', element_name)
ref = strip_ns(element_type)
type_node = self.get_named_node('complexType', 'name', ref)
for node in type_node:
if node.tag.endswith('}attribute'):
obj = Schema_Attribute(node, schema_root=self.lxml_root)
self.attrs[obj.name] = obj
elif node.tag.endswith('}attributeGroup'):
self.parse_attributeGroup(node)
elif node.tag.endswith('}sequence'):
self.parse_sequence(node)
elif node.tag.endswith('}annotation'):
pass
else:
raise_error(node, 'unhandled tag=', node.tag)
if schema_manager is not None:
self.types, self.units = schema_manager.parse_nxdlTypes()
self.patterns = schema_manager.parse_nxdl_patterns()
self.schema_types = dict(definition=self) # FIXME:
self.schema_types.update(self.children)
[docs] def parse_sequence(self, seq_node):
"""
parse the sequence used in the root element
"""
for node in seq_node:
if node.tag.endswith('}element'):
obj = Schema_Element(node, schema_root=self.lxml_root)
self.children[obj.name] = obj
elif node.tag.endswith('}group'):
obj = Schema_Type(node.attrib.get('ref'), schema_root=self.lxml_root)
obj.copy_to(self)
else:
msg = 'unhandled tag in ``definitionType``: '
raise_error(node, msg, node.tag)
[docs]class Schema_Attribute(_Mixin):
"""
xs:attribute element
:param lxml.etree.Element xml_obj: XML element
:param str obj_name: optional, default taken from ``xml_obj``
:param dict ns_dict: optional, default taken from :data:`NAMESPACE_DICT`
:param obj schema_root: optional, instance of lxml.etree._Element
"""
def __init__(self, xml_obj, obj_name=None, ns_dict=None, schema_root=None):
assert(xml_obj is not None)
assert(xml_obj.tag == '{'+xml_obj.nsmap['xs']+'}attribute')
_Mixin.__init__(
self,
xml_obj,
obj_name=obj_name,
ns_dict=ns_dict,
schema_root=schema_root)
use = xml_obj.attrib.get('use', 'optional')
self.required = use in ('required', )
self.type = xml_obj.attrib.get('type', 'str')
defalt = xml_obj.attrib.get('default')
if self.type in ('nx:NX_BOOLEAN',):
self.default_value = defalt.lower() in ('true', 'y', 1)
else:
self.default_value = defalt
self.enum = []
xpath_str = 'xs:simpleType/xs:restriction/xs:enumeration'
for node in xml_obj.xpath(xpath_str, namespaces=self.ns):
v = node.attrib.get('value')
if v is not None:
self.enum.append(v)
self.patterns = []
xpath_str = 'xs:simpleType/xs:restriction/xs:pattern'
for node in xml_obj.xpath(xpath_str, namespaces=self.ns):
v = node.attrib.get('value')
if v is not None:
self.patterns.append(v)
def __str__(self, *args, **kwargs):
try:
s = '@' + self.name
s += ' : ' + self.type
if len(self.enum):
s += ' = '
s += ' | '.join(self.enum)
return s
except Exception:
return _Mixin.__str__(self, *args, **kwargs)
[docs]class Schema_Element(_Mixin):
"""
xs:element
:param lxml.etree.Element xml_obj: XML element
:param str obj_name: optional, default taken from ``xml_obj``
:param dict ns_dict: optional, default taken from :data:`NAMESPACE_DICT`
:param obj schema_root: optional, instance of lxml.etree._Element
:see: http://download.nexusformat.org/doc/html/nxdl.html
:see: http://download.nexusformat.org/doc/html/nxdl_desc.html#nxdl-elements
"""
def __init__(self, xml_obj, obj_name=None, ns_dict=None, schema_root=None):
_Mixin.__init__(
self,
xml_obj,
obj_name=obj_name,
ns_dict=ns_dict,
schema_root=schema_root)
self.children = {}
self.attrs = {}
# read & analyze theNXDL structural *type* referenced by *ref*
ref = self.type = xml_obj.attrib.get('type')
if ref is None:
for node in xml_obj:
if node.tag.endswith('}complexType'):
a = Schema_Attribute(node.find('xs:attribute', self.ns), schema_root=self.lxml_root)
self.attrs[a.name] = a
elif node.tag.endswith('}annotation'):
pass
else:
raise_error(node, 'unhandled tag=', node.tag)
else:
# avoid known infinite recursion: group may contain group(s)
ok_to_parse = True
if xml_obj.attrib['name'] == 'group' and xml_obj.attrib['type'] == 'nx:groupType':
if _GroupParsing().started:
ok_to_parse = False
# needs a special code to apply this rule
# isinstance(obj, _Recursion)
self.children['group'] = _Recursion('group')
_GroupParsing().started = True
if ok_to_parse:
type_obj = Schema_Type(ref, schema_root=self.lxml_root)
type_obj.copy_to(self)
[docs]class Schema_Type(_Mixin):
"""
a named NXDL structure type (such as groupGroup)
:param str ref: name of NXDL structure type (such as ``groupGroup``)
:param str tag: XML Schema element tag, such as complexType (default=``*``)
:param obj schema_root: optional, instance of lxml.etree._Element
:see: http://download.nexusformat.org/doc/html/nxdl.html
:see: http://download.nexusformat.org/doc/html/nxdl_desc.html#nxdl-data-types-internal
"""
def __init__(self, ref, tag = '*', schema_root=None):
# _Mixin.__init__(self, xml_obj)
# do the _Mixin.__init__ directly here
self.ns = NAMESPACE_DICT
self.lxml_root = schema_root
xml_obj = self.get_named_node(tag, 'name', strip_ns(ref))
self.name = xml_obj.attrib.get('name')
self.attrs = {}
self.children = {}
for node in xml_obj:
if isinstance(node, lxml.etree._Comment):
pass
elif node.tag.endswith('}annotation'):
pass
elif node.tag.endswith('}attribute'):
self.parse_attribute(node)
elif node.tag.endswith('}attributeGroup'):
self.parse_attributeGroup(node)
elif node.tag.endswith('}complexContent'):
self.parse_complexContent(node)
elif node.tag.endswith('}group'):
self.parse_group(node)
elif node.tag.endswith('}sequence'):
self.parse_sequence(node)
else:
raise_error(node, 'unexpected tag=', node.tag)
[docs] def parse_sequence(self, node):
""" """
for subnode in node:
if subnode.tag.endswith('}element'):
obj = Schema_Element(subnode, schema_root=self.lxml_root)
self.children[obj.name] = obj
elif subnode.tag.endswith('}group'):
obj = Schema_Element(subnode, schema_root=self.lxml_root)
self.children[obj.name] = obj
elif subnode.tag.endswith('}any'):
# do not process this one, only used for documentation
pass
else:
raise_error(subnode, 'unexpected tag=', subnode.tag)
class _GroupParsing(singletons.Singleton):
"""
internal: avoid a known recursion of group in a group
"""
started = False
class _Recursion(_Mixin):
"""
internal: an element used in recursion, such as child group of group
:param str obj_name: optional, default taken from ``xml_obj``
"""
def __init__(self, obj_name):
_Mixin.__init__(self, None, obj_name=obj_name, ns_dict=None)
# if __name__ == '__main__':
# sm = SchemaManager()
# _breakpoint = True