"""
An RDF/XML parser for RDFLib
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Dict, List, NoReturn, Optional, Tuple
from urllib.parse import urldefrag, urljoin
from xml.sax import handler, make_parser, xmlreader
from xml.sax.handler import ErrorHandler
from xml.sax.saxutils import escape, quoteattr
from rdflib.exceptions import Error, ParserError
from rdflib.graph import Graph
from rdflib.namespace import RDF, is_ncname
from rdflib.parser import InputSource, Parser
from rdflib.plugins.parsers.RDFVOC import RDFVOC
from rdflib.term import BNode, Identifier, Literal, URIRef
if TYPE_CHECKING:
# from xml.sax.expatreader import ExpatLocator
from xml.sax.xmlreader import AttributesImpl, Locator
from rdflib.graph import _ObjectType, _SubjectType, _TripleType
__all__ = ["create_parser", "BagID", "ElementHandler", "RDFXMLHandler", "RDFXMLParser"]
RDFNS = RDFVOC
# http://www.w3.org/TR/rdf-syntax-grammar/#eventterm-attribute-URI
# A mapping from unqualified terms to their qualified version.
UNQUALIFIED = {
"about": RDFVOC.about,
"ID": RDFVOC.ID,
"type": RDFVOC.type,
"resource": RDFVOC.resource,
"parseType": RDFVOC.parseType,
}
# http://www.w3.org/TR/rdf-syntax-grammar/#coreSyntaxTerms
CORE_SYNTAX_TERMS = [
RDFVOC.RDF,
RDFVOC.ID,
RDFVOC.about,
RDFVOC.parseType,
RDFVOC.resource,
RDFVOC.nodeID,
RDFVOC.datatype,
]
# http://www.w3.org/TR/rdf-syntax-grammar/#syntaxTerms
SYNTAX_TERMS = CORE_SYNTAX_TERMS + [RDFVOC.Description, RDFVOC.li]
# http://www.w3.org/TR/rdf-syntax-grammar/#oldTerms
OLD_TERMS = [
URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#aboutEach"),
URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#aboutEachPrefix"),
URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#bagID"),
]
NODE_ELEMENT_EXCEPTIONS = (
CORE_SYNTAX_TERMS
+ [
RDFVOC.li,
]
+ OLD_TERMS
)
NODE_ELEMENT_ATTRIBUTES = [RDFVOC.ID, RDFVOC.nodeID, RDFVOC.about]
PROPERTY_ELEMENT_EXCEPTIONS = (
CORE_SYNTAX_TERMS
+ [
RDFVOC.Description,
]
+ OLD_TERMS
)
PROPERTY_ATTRIBUTE_EXCEPTIONS = (
CORE_SYNTAX_TERMS + [RDFVOC.Description, RDFVOC.li] + OLD_TERMS
)
PROPERTY_ELEMENT_ATTRIBUTES = [RDFVOC.ID, RDFVOC.resource, RDFVOC.nodeID]
XMLNS = "http://www.w3.org/XML/1998/namespace"
BASE = (XMLNS, "base")
LANG = (XMLNS, "lang")
[docs]class BagID(URIRef):
__slots__ = ["li"]
[docs] def __init__(self, val):
super(URIRef, self).__init__(val)
self.li = 0
[docs] def next_li(self):
self.li += 1
return RDFNS["_%s" % self.li]
[docs]class ElementHandler(object):
__slots__ = [
"start",
"char",
"end",
"li",
"id",
"base",
"subject",
"predicate",
"object",
"list",
"language",
"datatype",
"declared",
"data",
]
[docs] def __init__(self):
self.start = None
self.char = None
self.end = None
self.li = 0
self.id = None
self.base = None
self.subject = None
self.object = None
self.list = None
self.language = None
self.datatype = None
self.declared = None
self.data = None
[docs] def next_li(self):
self.li += 1
return RDFVOC["_%s" % self.li]
[docs]class RDFXMLHandler(handler.ContentHandler):
[docs] def __init__(self, store: Graph):
self.store = store
self.preserve_bnode_ids = False
self.reset()
[docs] def reset(self) -> None:
document_element = ElementHandler()
document_element.start = self.document_element_start
document_element.end = lambda name, qname: None
self.stack: List[Optional[ElementHandler]] = [
None,
document_element,
]
self.ids: Dict[str, int] = {} # remember IDs we have already seen
self.bnode: Dict[str, Identifier] = {}
self._ns_contexts: List[Dict[str, Optional[str]]] = [
{}
] # contains uri -> prefix dicts
self._current_context: Dict[str, Optional[str]] = self._ns_contexts[-1]
# ContentHandler methods
[docs] def setDocumentLocator(self, locator: Locator):
self.locator = locator
[docs] def startDocument(self) -> None:
pass
[docs] def startPrefixMapping(self, prefix: Optional[str], namespace: str) -> None:
self._ns_contexts.append(self._current_context.copy())
self._current_context[namespace] = prefix
self.store.bind(prefix, namespace or "", override=False)
[docs] def endPrefixMapping(self, prefix: Optional[str]) -> None:
self._current_context = self._ns_contexts[-1]
del self._ns_contexts[-1]
[docs] def startElementNS(
self, name: Tuple[Optional[str], str], qname, attrs: AttributesImpl
) -> None:
stack = self.stack
stack.append(ElementHandler())
current = self.current
parent = self.parent
base = attrs.get(BASE, None)
if base is not None:
base, frag = urldefrag(base)
if parent and parent.base:
base = urljoin(parent.base, base)
else:
systemId = self.locator.getPublicId() or self.locator.getSystemId()
if systemId:
base = urljoin(systemId, base)
else:
if parent:
base = parent.base
if base is None:
systemId = self.locator.getPublicId() or self.locator.getSystemId()
if systemId:
base, frag = urldefrag(systemId)
current.base = base
language = attrs.get(LANG, None)
if language is None:
if parent:
language = parent.language
current.language = language
current.start(name, qname, attrs)
[docs] def endElementNS(self, name: Tuple[Optional[str], str], qname) -> None:
self.current.end(name, qname)
self.stack.pop()
[docs] def characters(self, content: str) -> None:
char = self.current.char
if char:
char(content)
[docs] def ignorableWhitespace(self, content) -> None:
pass
[docs] def processingInstruction(self, target, data) -> None:
pass
[docs] def add_reified(self, sid: Identifier, spo: _TripleType):
s, p, o = spo
self.store.add((sid, RDF.type, RDF.Statement))
self.store.add((sid, RDF.subject, s))
self.store.add((sid, RDF.predicate, p))
self.store.add((sid, RDF.object, o))
[docs] def error(self, message: str) -> NoReturn:
locator = self.locator
info = "%s:%s:%s: " % (
locator.getSystemId(),
locator.getLineNumber(),
locator.getColumnNumber(),
)
raise ParserError(info + message)
[docs] def get_current(self) -> Optional[ElementHandler]:
return self.stack[-2]
# Create a read only property called current so that self.current
# give the current element handler.
current = property(get_current)
[docs] def get_next(self) -> Optional[ElementHandler]:
return self.stack[-1]
# Create a read only property that gives the element handler to be
# used for the next element.
next = property(get_next)
[docs] def get_parent(self) -> Optional[ElementHandler]:
return self.stack[-3]
# Create a read only property that gives the current parent
# element handler
parent = property(get_parent)
[docs] def absolutize(self, uri: str) -> URIRef:
# type error: Argument "allow_fragments" to "urljoin" has incompatible type "int"; expected "bool"
result = urljoin(self.current.base, uri, allow_fragments=1) # type: ignore[arg-type]
if uri and uri[-1] == "#" and result[-1] != "#":
result = "%s#" % result
return URIRef(result)
[docs] def convert(
self, name: Tuple[Optional[str], str], qname, attrs: AttributesImpl
) -> Tuple[URIRef, Dict[URIRef, str]]:
if name[0] is None:
# type error: Incompatible types in assignment (expression has type "URIRef", variable has type "Tuple[Optional[str], str]")
name = URIRef(name[1]) # type: ignore[assignment]
else:
# type error: Incompatible types in assignment (expression has type "URIRef", variable has type "Tuple[Optional[str], str]")
# type error: Argument 1 to "join" of "str" has incompatible type "Tuple[Optional[str], str]"; expected "Iterable[str]"
name = URIRef("".join(name)) # type: ignore[assignment, arg-type]
atts = {}
for n, v in attrs.items():
if n[0] is None:
att = n[1]
else:
att = "".join(n)
if att.startswith(XMLNS) or att[0:3].lower() == "xml":
pass
elif att in UNQUALIFIED:
# if not RDFNS[att] in atts:
# type error: Variable "att" is not valid as a type
atts[RDFNS[att]] = v # type: ignore[misc, valid-type]
else:
atts[URIRef(att)] = v
# type error: Incompatible return value type (got "Tuple[Tuple[Optional[str], str], Dict[Any, Any]]", expected "Tuple[URIRef, Dict[URIRef, str]]")
return name, atts # type: ignore[return-value]
[docs] def document_element_start(
self, name: Tuple[str, str], qname, attrs: AttributesImpl
) -> None:
if name[0] and URIRef("".join(name)) == RDFVOC.RDF:
# Cheap hack so 2to3 doesn't turn it into __next__
next = getattr(self, "next")
next.start = self.node_element_start
next.end = self.node_element_end
else:
self.node_element_start(name, qname, attrs)
# self.current.end = self.node_element_end
# TODO... set end to something that sets start such that
# another element will cause error
[docs] def node_element_start(
self, name: Tuple[str, str], qname, attrs: AttributesImpl
) -> None:
# type error: Incompatible types in assignment (expression has type "URIRef", variable has type "Tuple[str, str]")
name, atts = self.convert(name, qname, attrs) # type: ignore[assignment]
current = self.current
absolutize = self.absolutize
# Cheap hack so 2to3 doesn't turn it into __next__
next = getattr(self, "next")
next.start = self.property_element_start
next.end = self.property_element_end
if name in NODE_ELEMENT_EXCEPTIONS:
# type error: Not all arguments converted during string formatting
self.error("Invalid node element URI: %s" % name) # type: ignore[str-format]
subject: _SubjectType
if RDFVOC.ID in atts:
if RDFVOC.about in atts or RDFVOC.nodeID in atts:
self.error("Can have at most one of rdf:ID, rdf:about, and rdf:nodeID")
id = atts[RDFVOC.ID]
if not is_ncname(id):
self.error("rdf:ID value is not a valid NCName: %s" % id)
subject = absolutize("#%s" % id)
if subject in self.ids:
self.error("two elements cannot use the same ID: '%s'" % subject)
self.ids[subject] = 1 # IDs can only appear once within a document
elif RDFVOC.nodeID in atts:
if RDFVOC.ID in atts or RDFVOC.about in atts:
self.error("Can have at most one of rdf:ID, rdf:about, and rdf:nodeID")
nodeID = atts[RDFVOC.nodeID]
if not is_ncname(nodeID):
self.error("rdf:nodeID value is not a valid NCName: %s" % nodeID)
if self.preserve_bnode_ids is False:
if nodeID in self.bnode:
subject = self.bnode[nodeID]
else:
subject = BNode()
self.bnode[nodeID] = subject
else:
subject = BNode(nodeID)
elif RDFVOC.about in atts:
if RDFVOC.ID in atts or RDFVOC.nodeID in atts:
self.error("Can have at most one of rdf:ID, rdf:about, and rdf:nodeID")
subject = absolutize(atts[RDFVOC.about])
else:
subject = BNode()
if name != RDFVOC.Description: # S1
# error: Argument 1 has incompatible type "Tuple[str, str]"; expected "str"
self.store.add((subject, RDF.type, absolutize(name))) # type: ignore[arg-type]
object: _ObjectType
language = current.language
for att in atts:
if not att.startswith(str(RDFNS)):
predicate = absolutize(att)
try:
object = Literal(atts[att], language)
except Error as e:
# type error: Argument 1 to "error" of "RDFXMLHandler" has incompatible type "Optional[str]"; expected "str"
self.error(e.msg) # type: ignore[arg-type]
elif att == RDF.type: # S2
predicate = RDF.type
object = absolutize(atts[RDF.type])
elif att in NODE_ELEMENT_ATTRIBUTES:
continue
elif att in PROPERTY_ATTRIBUTE_EXCEPTIONS: # S3
self.error("Invalid property attribute URI: %s" % att)
# type error: Statement is unreachable
continue # type: ignore[unreachable] # for when error does not throw an exception
else:
predicate = absolutize(att)
try:
object = Literal(atts[att], language)
except Error as e:
# type error: Argument 1 to "error" of "RDFXMLHandler" has incompatible type "Optional[str]"; expected "str"
self.error(e.msg) # type: ignore[arg-type]
self.store.add((subject, predicate, object))
current.subject = subject
[docs] def node_element_end(self, name: Tuple[str, str], qname) -> None:
# repeat node-elements are only allowed
# at at top-level
if self.parent.object and self.current != self.stack[2]:
self.error(
"Repeat node-elements inside property elements: %s" % "".join(name)
)
self.parent.object = self.current.subject
[docs] def property_element_start(
self, name: Tuple[str, str], qname, attrs: AttributesImpl
) -> None:
# type error: Incompatible types in assignment (expression has type "URIRef", variable has type "Tuple[str, str]")
name, atts = self.convert(name, qname, attrs) # type: ignore[assignment]
current = self.current
absolutize = self.absolutize
# Cheap hack so 2to3 doesn't turn it into __next__
next = getattr(self, "next")
object: Optional[_ObjectType] = None
current.data = None
current.list = None
# type error: "Tuple[str, str]" has no attribute "startswith"
if not name.startswith(str(RDFNS)): # type: ignore[attr-defined]
# type error: Argument 1 has incompatible type "Tuple[str, str]"; expected "str"
current.predicate = absolutize(name) # type: ignore[arg-type]
elif name == RDFVOC.li:
current.predicate = current.next_li()
elif name in PROPERTY_ELEMENT_EXCEPTIONS:
# type error: Not all arguments converted during string formatting
self.error("Invalid property element URI: %s" % name) # type: ignore[str-format]
else:
# type error: Argument 1 has incompatible type "Tuple[str, str]"; expected "str"
current.predicate = absolutize(name) # type: ignore[arg-type]
id = atts.get(RDFVOC.ID, None)
if id is not None:
if not is_ncname(id):
self.error("rdf:ID value is not a value NCName: %s" % id)
current.id = absolutize("#%s" % id)
else:
current.id = None
resource = atts.get(RDFVOC.resource, None)
nodeID = atts.get(RDFVOC.nodeID, None)
parse_type = atts.get(RDFVOC.parseType, None)
if resource is not None and nodeID is not None:
self.error("Property element cannot have both rdf:nodeID and rdf:resource")
if resource is not None:
object = absolutize(resource)
next.start = self.node_element_start
next.end = self.node_element_end
elif nodeID is not None:
if not is_ncname(nodeID):
self.error("rdf:nodeID value is not a valid NCName: %s" % nodeID)
if self.preserve_bnode_ids is False:
if nodeID in self.bnode:
object = self.bnode[nodeID]
else:
subject = BNode()
self.bnode[nodeID] = subject
object = subject
else:
object = subject = BNode(nodeID)
next.start = self.node_element_start
next.end = self.node_element_end
else:
if parse_type is not None:
for att in atts:
if att != RDFVOC.parseType and att != RDFVOC.ID:
self.error("Property attr '%s' now allowed here" % att)
if parse_type == "Resource":
current.subject = object = BNode()
current.char = self.property_element_char
next.start = self.property_element_start
next.end = self.property_element_end
elif parse_type == "Collection":
current.char = None
object = current.list = RDF.nil # BNode()
# self.parent.subject
next.start = self.node_element_start
next.end = self.list_node_element_end
else: # if parse_type=="Literal":
# All other values are treated as Literal
# See: http://www.w3.org/TR/rdf-syntax-grammar/
# parseTypeOtherPropertyElt
object = Literal("", datatype=RDFVOC.XMLLiteral)
current.char = self.literal_element_char
current.declared = {XMLNS: "xml"}
next.start = self.literal_element_start
next.char = self.literal_element_char
next.end = self.literal_element_end
current.object = object
return
else:
object = None
current.char = self.property_element_char
next.start = self.node_element_start
next.end = self.node_element_end
datatype = current.datatype = atts.get(RDFVOC.datatype, None)
language = current.language
if datatype is not None:
# TODO: check that there are no atts other than datatype and id
datatype = absolutize(datatype)
else:
for att in atts:
if not att.startswith(str(RDFNS)):
predicate = absolutize(att)
elif att in PROPERTY_ELEMENT_ATTRIBUTES:
continue
elif att in PROPERTY_ATTRIBUTE_EXCEPTIONS:
self.error("""Invalid property attribute URI: %s""" % att)
else:
predicate = absolutize(att)
o: _ObjectType
if att == RDF.type:
o = URIRef(atts[att])
else:
if datatype is not None:
# type error: Statement is unreachable
language = None # type: ignore[unreachable]
o = Literal(atts[att], language, datatype)
if object is None:
object = BNode()
self.store.add((object, predicate, o))
if object is None:
current.data = ""
current.object = None
else:
current.data = None
current.object = object
[docs] def property_element_char(self, data: str) -> None:
current = self.current
if current.data is not None:
current.data += data
[docs] def property_element_end(self, name: Tuple[str, str], qname) -> None:
current = self.current
if current.data is not None and current.object is None:
literalLang = current.language
if current.datatype is not None:
literalLang = None
current.object = Literal(current.data, literalLang, current.datatype)
current.data = None
if self.next.end == self.list_node_element_end:
if current.object != RDF.nil:
self.store.add((current.list, RDF.rest, RDF.nil))
if current.object is not None:
self.store.add((self.parent.subject, current.predicate, current.object))
if current.id is not None:
self.add_reified(
current.id, (self.parent.subject, current.predicate, current.object)
)
current.subject = None
[docs] def list_node_element_end(self, name: Tuple[str, str], qname) -> None:
current = self.current
if self.parent.list == RDF.nil:
list = BNode()
# Removed between 20030123 and 20030905
# self.store.add((list, RDF.type, LIST))
self.parent.list = list
self.store.add((self.parent.list, RDF.first, current.subject))
self.parent.object = list
self.parent.char = None
else:
list = BNode()
# Removed between 20030123 and 20030905
# self.store.add((list, RDF.type, LIST))
self.store.add((self.parent.list, RDF.rest, list))
self.store.add((list, RDF.first, current.subject))
self.parent.list = list
[docs] def literal_element_start(
self, name: Tuple[str, str], qname, attrs: AttributesImpl
) -> None:
current = self.current
self.next.start = self.literal_element_start
self.next.char = self.literal_element_char
self.next.end = self.literal_element_end
current.declared = self.parent.declared.copy()
if name[0]:
prefix = self._current_context[name[0]]
if prefix:
current.object = "<%s:%s" % (prefix, name[1])
else:
current.object = "<%s" % name[1]
if not name[0] in current.declared:
current.declared[name[0]] = prefix
if prefix:
current.object += ' xmlns:%s="%s"' % (prefix, name[0])
else:
current.object += ' xmlns="%s"' % name[0]
else:
current.object = "<%s" % name[1]
for name, value in attrs.items():
if name[0]:
if not name[0] in current.declared:
current.declared[name[0]] = self._current_context[name[0]]
name = current.declared[name[0]] + ":" + name[1]
else:
# type error: Incompatible types in assignment (expression has type "str", variable has type "Tuple[str, str]")
name = name[1] # type: ignore[assignment]
current.object += " %s=%s" % (name, quoteattr(value))
current.object += ">"
[docs] def literal_element_char(self, data: str) -> None:
self.current.object += escape(data)
[docs] def literal_element_end(self, name: Tuple[str, str], qname) -> None:
if name[0]:
prefix = self._current_context[name[0]]
if prefix:
end = "</%s:%s>" % (prefix, name[1])
else:
end = "</%s>" % name[1]
else:
end = "</%s>" % name[1]
self.parent.object += self.current.object + end
[docs]def create_parser(target: InputSource, store: Graph) -> xmlreader.XMLReader:
parser = make_parser()
try:
# Workaround for bug in expatreader.py. Needed when
# expatreader is trying to guess a prefix.
parser.start_namespace_decl("xml", "http://www.w3.org/XML/1998/namespace") # type: ignore[attr-defined]
except AttributeError:
pass # Not present in Jython (at least)
parser.setFeature(handler.feature_namespaces, 1)
rdfxml = RDFXMLHandler(store)
# type error: Argument 1 to "setDocumentLocator" of "RDFXMLHandler" has incompatible type "InputSource"; expected "Locator"
rdfxml.setDocumentLocator(target) # type: ignore[arg-type]
# rdfxml.setDocumentLocator(_Locator(self.url, self.parser))
parser.setContentHandler(rdfxml)
parser.setErrorHandler(ErrorHandler())
return parser
[docs]class RDFXMLParser(Parser):
[docs] def __init__(self):
pass
[docs] def parse(self, source: InputSource, sink: Graph, **args: Any) -> None:
self._parser = create_parser(source, sink)
content_handler = self._parser.getContentHandler()
preserve_bnode_ids = args.get("preserve_bnode_ids", None)
if preserve_bnode_ids is not None:
content_handler.preserve_bnode_ids = preserve_bnode_ids
# # We're only using it once now
# content_handler.reset()
# self._parser.reset()
self._parser.parse(source)