#!/usr/bin/env python
# -*- coding: utf-8 -*-
# -----------------------------------------------------------------------------
# :author: Pete R. Jemian
# :email: prjemian@gmail.com
# :copyright: (c) 2016-2017, Pete R. Jemian
#
# Distributed under the terms of the Creative Commons Attribution 4.0 International Public License.
#
# The full license is in the file LICENSE.txt, distributed with this software.
# -----------------------------------------------------------------------------
"""
manages the XML Schema of this project
The *schema_manager* calls the *cache_manager* and
is called by *nxdl_manager*.
Public
.. autosummary::
~SchemaManager
~Schema_Root
~Schema_Attribute
~Schema_Element
~Schema_Type
~get_default_schema_manager
~raise_error
~strip_ns
Internal
.. autosummary::
~_Mixin
~_GroupParsing
~_Recursion
"""
from __future__ import print_function
import lxml.etree
import os
from . import NAMESPACE_DICT, FileNotFound, InvalidNxdlFile
from . import singletons
from . import utils
logger = utils.setup_logger(__name__)
[docs]def strip_ns(ref):
"""
strip the namespace prefix from ``ref``
:param str ref: one word, colon delimited string, such as *nx:groupGroup*
:returns str: the part to the right of the last colon
"""
return ref.split(":")[-1]
[docs]def raise_error(node, text, obj):
"""
standard *ValueError* exception handling
:param obj node: instance of
:param str text: label for ``obj``
:param str obj: value
"""
msg = "line " + str(node.sourceline)
msg += ": " + text + str(obj)
raise ValueError(msg)
[docs]def get_default_schema_manager():
"""
internal: convenience function
"""
from punx import cache_manager
cm = cache_manager.CacheManager()
assert cm is not None
assert cm.default_file_set is not None
return cm.default_file_set.schema_manager
[docs]class SchemaManager(object):
"""
describes the XML Schema for the NeXus NXDL definitions files
"""
ns = NAMESPACE_DICT
def __init__(self, path=None):
from punx import cache_manager
if path is None:
cm = cache_manager.CacheManager()
if cm is None or cm.default_file_set is None:
raise ValueError("Could not get NXDL file set from the cache")
path = cm.default_file_set.path
schema_file = os.path.join(path, "nxdl.xsd")
if not os.path.exists(schema_file):
raise FileNotFound(schema_file)
self.schema_file = schema_file
if not os.path.exists(self.schema_file):
raise FileNotFound("XML Schema file: " + self.schema_file)
self.lxml_tree = lxml.etree.parse(self.schema_file)
self.lxml_schema = lxml.etree.XMLSchema(self.lxml_tree)
self.lxml_root = self.lxml_tree.getroot()
nodes = self.lxml_root.xpath("xs:element", namespaces=self.ns)
if len(nodes) != 1:
raise InvalidNxdlFile(self.schema_file)
self.nxdl = Schema_Root(
nodes[0], ns_dict=self.ns, schema_root=self.lxml_root, schema_manager=self
)
# cleanup these internal structures
del self.lxml_root
# del self.lxml_schema # needed for XML file validation
del self.lxml_tree
[docs] def parse_nxdl_patterns(self):
"""
get regexp patterns for validItemName, validNXClassName, & validTargetName from nxdl.xsd
"""
db = {}
for node in self.lxml_root.xpath("xs:simpleType", namespaces=self.ns):
key = node.attrib["name"]
if key.startswith("valid"):
obj = Schema_pattern()
obj.pattern_name = key
db[key] = obj
subnodes = node.xpath("xs:restriction", namespaces=self.ns)
assert len(subnodes) == 1
obj.base = strip_ns(subnodes[0].attrib["base"])
for item in subnodes[0]:
if isinstance(item, lxml.etree._Comment):
pass
elif item.tag.endswith("}pattern"):
obj.re_list.append(item.attrib["value"])
elif item.tag.endswith("}maxLength"):
obj.maxLength = int(item.attrib["value"])
# adjust for any restrictions with NeXus base
for v in db.values():
if v.base != "token" and v.base in db:
base = db[v.base]
v.base = base.base
v.maxLength = base.maxLength
v.re_list += base.re_list
return db
[docs] def parse_nxdlTypes(self):
"""
get the allowed data types and unit types from nxdlTypes.xsd
"""
if os.path.exists(self.schema_file):
path = os.path.dirname(self.schema_file)
else:
from punx import cache_manager
cm = cache_manager.CacheManager()
if cm is None or cm.default_file_set is None:
raise ValueError("Could not get NXDL file set from the cache")
path = cm.default_file_set.path
self.types_file = os.path.join(path, "nxdlTypes.xsd")
if not os.path.exists(self.types_file):
raise FileNotFound(self.types_file)
lxml_types_tree = lxml.etree.parse(self.types_file)
db = {}
root = lxml_types_tree.getroot()
for node in root:
if isinstance(node, lxml.etree._Comment):
pass
elif node.tag.endswith("}annotation"):
pass
else:
obj = Schema_nxdlType(node, ns_dict=self.ns, schema_root=root)
if obj.name is not None:
db[obj.name] = obj
# re-arrange
units = list(db["anyUnitsAttr"].values or [])
del db["anyUnitsAttr"]
del db["primitiveType"]
return db, units
[docs]class Schema_pattern(object):
"""
describe the regular expression patterns ofr names of NeXus things
"""
def __init__(self):
self.base = "token"
self.pattern_name = None
self.re_list = []
self.maxLength = -1 # unlimited
[docs]class Schema_nxdlType(object):
"""
one of the types defined in the file *nxdlTypes.xsd*
"""
def __init__(self, xml_obj, ns_dict=None, schema_root=None):
self.name = xml_obj.attrib.get("name")
self.restriction = None
self.union = None
self.values = None
self.schema_root = schema_root
self.attrs = {}
for node in xml_obj:
if isinstance(node, lxml.etree._Comment):
pass
elif node.tag.endswith("}annotation"):
pass
elif node.tag.endswith("}list"):
self.values = map(strip_ns, [node.attrib["itemType"]])
elif node.tag.endswith("}restriction"):
self.restriction = strip_ns(node.attrib["base"])
self.values = []
for subnode in node:
if isinstance(subnode, lxml.etree._Comment):
pass
elif subnode.tag.endswith("}enumeration"):
self.values.append(subnode.attrib["value"])
elif node.tag.endswith("}union"):
self.union = map(strip_ns, node.attrib["memberTypes"].split())
else:
raise_error(node, "unhandled tag=", node.tag)
class _Mixin(object):
"""
common code for NXDL Rules classes below
:param lxml.etree.Element xml_obj: XML element
:param str obj_name: optional, default taken from ``xml_obj``
:param dict ns_dict: optional, default taken from :data:`__init__.NAMESPACE_DICT`
:param obj schema_root: optional, instance of lxml.etree._Element
"""
def __init__(self, xml_obj, obj_name=None, ns_dict=None, schema_root=None):
self.name = obj_name or xml_obj.attrib.get("name")
self.ns = ns_dict or NAMESPACE_DICT
self.lxml_root = schema_root
def get_named_node(self, tag, attribute, value):
"""
return a named node from the XML Schema
:param str tag: XML Schema tag (such as "complexType") to match
:param str attribute: attribute name to match
:param str value: attribute value to match
"""
if self.lxml_root is None:
raise ValueError
root = self.lxml_root
xpath_str = "xs:" + tag
xpath_str += "[@" + attribute
xpath_str += '="' + value + '"]'
node_list = root.xpath(xpath_str, namespaces=self.ns)
if len(node_list) != 1:
msg = "wrong number of " + tag
msg += " nodes found: " + str(len(node_list))
raise ValueError(msg)
return node_list[0]
def copy_to(self, target):
"""
copy results into target object
:param obj target: instance of _Mixin, such as Schema_Element
"""
for k, v in self.attrs.items():
target.attrs[k] = v
for k, v in self.children.items():
target.children[k] = v
def parse_attribute(self, node):
""" """
obj = Schema_Attribute(node, schema_root=self.lxml_root)
self.attrs[obj.name] = obj
def parse_attributeGroup(self, node):
""" """
obj = Schema_Type(node.attrib.get("ref"), schema_root=self.lxml_root)
obj.copy_to(self)
def parse_complexContent(self, node):
""" """
for subnode in node:
if subnode.tag.endswith("}extension"):
ref = subnode.attrib.get("base")
if ref not in ("nx:basicComponent"):
raise_error(subnode, "unexpected base=", ref)
obj = Schema_Type(ref, schema_root=self.lxml_root)
obj.copy_to(self)
# parse children of extension node
for obj_node in subnode:
if obj_node.tag.endswith("}annotation"):
pass
elif obj_node.tag.endswith("}attribute"):
self.parse_attribute(obj_node)
elif obj_node.tag.endswith("}sequence"):
self.parse_sequence(obj_node)
else:
raise_error(obj_node, "unexpected base=", obj_node.tag)
else:
raise_error(subnode, "unexpected tag=", subnode.tag)
def parse_group(self, node):
""" """
obj = Schema_Type(node.attrib.get("ref"), schema_root=self.lxml_root)
obj.copy_to(self)
[docs]class Schema_Root(_Mixin):
"""
root element of the nxdl.xsd file
:param lxml.etree.Element xml_obj: XML element
:param str obj_name: optional, default taken from ``xml_obj``
:param dict ns_dict: optional, default taken from :data:`NAMESPACE_DICT`
:param obj schema_root: optional, instance of lxml.etree._Element
"""
attrs = {}
children = {}
patterns = None
type = None
units = None
def __init__(
self,
element_node,
obj_name=None,
ns_dict=None,
schema_root=None,
schema_manager=None,
):
_Mixin.__init__(
self,
element_node,
obj_name=obj_name,
ns_dict=ns_dict,
schema_root=schema_root,
)
self.schema_manager = schema_manager
element_type = element_node.attrib.get("type")
if element_type is None:
element_name = element_node.attrib.get("name")
raise_error(element_node, "no @type for element node: ", element_name)
ref = strip_ns(element_type)
type_node = self.get_named_node("complexType", "name", ref)
for node in type_node:
if node.tag.endswith("}attribute"):
obj = Schema_Attribute(node, schema_root=self.lxml_root)
self.attrs[obj.name] = obj
elif node.tag.endswith("}attributeGroup"):
self.parse_attributeGroup(node)
elif node.tag.endswith("}sequence"):
self.parse_sequence(node)
elif node.tag.endswith("}annotation"):
pass
else:
raise_error(node, "unhandled tag=", node.tag)
if schema_manager is not None:
self.types, self.units = schema_manager.parse_nxdlTypes()
self.patterns = schema_manager.parse_nxdl_patterns()
self.schema_types = dict(definition=self) # FIXME:
self.schema_types.update(self.children)
[docs] def parse_sequence(self, seq_node):
"""
parse the sequence used in the root element
"""
for node in seq_node:
if node.tag.endswith("}element"):
obj = Schema_Element(node, schema_root=self.lxml_root)
self.children[obj.name] = obj
elif node.tag.endswith("}group"):
obj = Schema_Type(node.attrib.get("ref"), schema_root=self.lxml_root)
obj.copy_to(self)
else:
msg = "unhandled tag in ``definitionType``: "
raise_error(node, msg, node.tag)
[docs]class Schema_Attribute(_Mixin):
"""
xs:attribute element
:param lxml.etree.Element xml_obj: XML element
:param str obj_name: optional, default taken from ``xml_obj``
:param dict ns_dict: optional, default taken from :data:`NAMESPACE_DICT`
:param obj schema_root: optional, instance of lxml.etree._Element
"""
def __init__(self, xml_obj, obj_name=None, ns_dict=None, schema_root=None):
assert xml_obj is not None
assert xml_obj.tag == "{" + xml_obj.nsmap["xs"] + "}attribute"
_Mixin.__init__(
self, xml_obj, obj_name=obj_name, ns_dict=ns_dict, schema_root=schema_root
)
use = xml_obj.attrib.get("use", "optional")
self.required = use in ("required",)
self.type = xml_obj.attrib.get("type", "str")
defalt = xml_obj.attrib.get("default")
if self.type in ("nx:NX_BOOLEAN",):
self.default_value = defalt.lower() in ("true", "y", 1)
else:
self.default_value = defalt
self.enum = []
xpath_str = "xs:simpleType/xs:restriction/xs:enumeration"
for node in xml_obj.xpath(xpath_str, namespaces=self.ns):
v = node.attrib.get("value")
if v is not None:
self.enum.append(v)
self.patterns = []
xpath_str = "xs:simpleType/xs:restriction/xs:pattern"
for node in xml_obj.xpath(xpath_str, namespaces=self.ns):
v = node.attrib.get("value")
if v is not None:
self.patterns.append(v)
def __str__(self, *args, **kwargs):
try:
s = "@" + self.name
s += " : " + self.type
if len(self.enum):
s += " = "
s += " | ".join(self.enum)
return s
except Exception:
return _Mixin.__str__(self, *args, **kwargs)
[docs]class Schema_Element(_Mixin):
"""
xs:element
:param lxml.etree.Element xml_obj: XML element
:param str obj_name: optional, default taken from ``xml_obj``
:param dict ns_dict: optional, default taken from :data:`NAMESPACE_DICT`
:param obj schema_root: optional, instance of lxml.etree._Element
:see: http://download.nexusformat.org/doc/html/nxdl.html
:see: http://download.nexusformat.org/doc/html/nxdl_desc.html#nxdl-elements
"""
def __init__(self, xml_obj, obj_name=None, ns_dict=None, schema_root=None):
_Mixin.__init__(
self, xml_obj, obj_name=obj_name, ns_dict=ns_dict, schema_root=schema_root
)
self.children = {}
self.attrs = {}
# read & analyze theNXDL structural *type* referenced by *ref*
ref = self.type = xml_obj.attrib.get("type")
if ref is None:
for node in xml_obj:
if node.tag.endswith("}complexType"):
a = Schema_Attribute(
node.find("xs:attribute", self.ns), schema_root=self.lxml_root
)
self.attrs[a.name] = a
elif node.tag.endswith("}annotation"):
pass
else:
raise_error(node, "unhandled tag=", node.tag)
else:
# avoid known infinite recursion: group may contain group(s)
ok_to_parse = True
if (
xml_obj.attrib["name"] == "group"
and xml_obj.attrib["type"] == "nx:groupType"
):
if _GroupParsing().started:
ok_to_parse = False
# needs a special code to apply this rule
# isinstance(obj, _Recursion)
self.children["group"] = _Recursion("group")
_GroupParsing().started = True
if ok_to_parse:
type_obj = Schema_Type(ref, schema_root=self.lxml_root)
type_obj.copy_to(self)
[docs]class Schema_Type(_Mixin):
"""
a named NXDL structure type (such as groupGroup)
:param str ref: name of NXDL structure type (such as ``groupGroup``)
:param str tag: XML Schema element tag, such as complexType (default=``*``)
:param obj schema_root: optional, instance of lxml.etree._Element
:see: http://download.nexusformat.org/doc/html/nxdl.html
:see: http://download.nexusformat.org/doc/html/nxdl_desc.html#nxdl-data-types-internal
"""
def __init__(self, ref, tag="*", schema_root=None):
# _Mixin.__init__(self, xml_obj)
# do the _Mixin.__init__ directly here
self.ns = NAMESPACE_DICT
self.lxml_root = schema_root
xml_obj = self.get_named_node(tag, "name", strip_ns(ref))
self.name = xml_obj.attrib.get("name")
self.attrs = {}
self.children = {}
for node in xml_obj:
if isinstance(node, lxml.etree._Comment):
pass
elif node.tag.endswith("}annotation"):
pass
elif node.tag.endswith("}attribute"):
self.parse_attribute(node)
elif node.tag.endswith("}attributeGroup"):
self.parse_attributeGroup(node)
elif node.tag.endswith("}complexContent"):
self.parse_complexContent(node)
elif node.tag.endswith("}group"):
self.parse_group(node)
elif node.tag.endswith("}sequence"):
self.parse_sequence(node)
else:
raise_error(node, "unexpected tag=", node.tag)
def parse_sequence(self, node):
""" """
for subnode in node:
if subnode.tag.endswith("}element"):
obj = Schema_Element(subnode, schema_root=self.lxml_root)
self.children[obj.name] = obj
elif subnode.tag.endswith("}group"):
obj = Schema_Element(subnode, schema_root=self.lxml_root)
self.children[obj.name] = obj
elif subnode.tag.endswith("}any"):
# do not process this one, only used for documentation
pass
else:
raise_error(subnode, "unexpected tag=", subnode.tag)
class _GroupParsing(singletons.Singleton):
"""
internal: avoid a known recursion of group in a group
"""
started = False
class _Recursion(_Mixin):
"""
internal: an element used in recursion, such as child group of group
:param str obj_name: optional, default taken from ``xml_obj``
"""
def __init__(self, obj_name):
_Mixin.__init__(self, None, obj_name=obj_name, ns_dict=None)
# if __name__ == '__main__':
# sm = SchemaManager()
# _breakpoint = True