"""\
N-Triples Parser
License: GPL 2, W3C, BSD, or MIT
Author: Sean B. Palmer, inamidst.com
"""
from __future__ import annotations
import codecs
import re
from io import BytesIO, StringIO, TextIOBase
from typing import (
IO,
TYPE_CHECKING,
Any,
Match,
MutableMapping,
Optional,
Pattern,
TextIO,
Union,
)
from rdflib.compat import _string_escape_map, decodeUnicodeEscape
from rdflib.exceptions import ParserError as ParseError
from rdflib.parser import InputSource, Parser
from rdflib.term import BNode as bNode
from rdflib.term import Literal, URIRef
from rdflib.term import URIRef as URI # noqa: N814
if TYPE_CHECKING:
import typing_extensions as te
from rdflib.graph import Graph, _ObjectType, _PredicateType, _SubjectType
__all__ = [
"unquote",
"uriquote",
"W3CNTriplesParser",
"NTGraphSink",
"NTParser",
"DummySink",
]
uriref = r'<([^:]+:[^\s"<>]*)>'
literal = r'"([^"\\]*(?:\\.[^"\\]*)*)"'
litinfo = r"(?:@([a-zA-Z]+(?:-[a-zA-Z0-9]+)*)|\^\^" + uriref + r")?"
r_line = re.compile(r"([^\r\n]*)(?:\r\n|\r|\n)")
r_wspace = re.compile(r"[ \t]*")
r_wspaces = re.compile(r"[ \t]+")
r_tail = re.compile(r"[ \t]*\.[ \t]*(#.*)?")
r_uriref = re.compile(uriref)
r_nodeid = re.compile(r"_:([A-Za-z0-9_:]([-A-Za-z0-9_:\.]*[-A-Za-z0-9_:])?)")
r_literal = re.compile(literal + litinfo)
bufsiz = 2048
validate = False
[docs]class DummySink:
[docs] def __init__(self):
self.length = 0
[docs] def triple(self, s, p, o):
self.length += 1
print(s, p, o)
r_safe = re.compile(r"([\x20\x21\x23-\x5B\x5D-\x7E]+)")
r_quot = re.compile(r"""\\([tbnrf"'\\])""")
r_uniquot = re.compile(r"\\u([0-9A-Fa-f]{4})|\\U([0-9A-Fa-f]{8})")
[docs]def unquote(s: str) -> str:
"""Unquote an N-Triples string."""
if not validate:
if isinstance(s, str): # nquads
s = decodeUnicodeEscape(s)
else:
s = s.decode("unicode-escape") # type: ignore[unreachable]
return s
else:
result = []
while s:
m = r_safe.match(s)
if m:
s = s[m.end() :]
result.append(m.group(1))
continue
m = r_quot.match(s)
if m:
s = s[2:]
result.append(_string_escape_map[m.group(1)])
continue
m = r_uniquot.match(s)
if m:
s = s[m.end() :]
u, U = m.groups() # noqa: N806
codepoint = int(u or U, 16)
if codepoint > 0x10FFFF:
raise ParseError("Disallowed codepoint: %08X" % codepoint)
result.append(chr(codepoint))
elif s.startswith("\\"):
raise ParseError("Illegal escape at: %s..." % s[:10])
else:
raise ParseError("Illegal literal character: %r" % s[0])
return "".join(result)
r_hibyte = re.compile(r"([\x80-\xFF])")
[docs]def uriquote(uri: str) -> str:
if not validate:
return uri
else:
return r_hibyte.sub(lambda m: "%%%02X" % ord(m.group(1)), uri)
_BNodeContextType = MutableMapping[str, bNode]
[docs]class W3CNTriplesParser:
"""An N-Triples Parser.
This is a legacy-style Triples parser for NTriples provided by W3C
Usage::
p = W3CNTriplesParser(sink=MySink())
sink = p.parse(f) # file; use parsestring for a string
To define a context in which blank node identifiers refer to the same blank node
across instances of NTriplesParser, pass the same dict as ``bnode_context`` to each
instance. By default, a new blank node context is created for each instance of
`W3CNTriplesParser`.
"""
__slots__ = ("_bnode_ids", "sink", "buffer", "file", "line")
[docs] def __init__(
self,
sink: Optional[Union[DummySink, NTGraphSink]] = None,
bnode_context: Optional[_BNodeContextType] = None,
):
if bnode_context is not None:
self._bnode_ids = bnode_context
else:
self._bnode_ids = {}
self.sink: Union[DummySink, NTGraphSink]
if sink is not None:
self.sink = sink
else:
self.sink = DummySink()
self.buffer: Optional[str] = None
self.file: Optional[Union[TextIO, codecs.StreamReader]] = None
self.line: Optional[str] = ""
[docs] def parse(
self,
f: Union[TextIO, IO[bytes], codecs.StreamReader],
bnode_context: Optional[_BNodeContextType] = None,
) -> Union[DummySink, NTGraphSink]:
"""
Parse f as an N-Triples file.
:type f: :term:`file object`
:param f: the N-Triples source
:type bnode_context: `dict`, optional
:param bnode_context: a dict mapping blank node identifiers (e.g., ``a`` in ``_:a``)
to `~rdflib.term.BNode` instances. An empty dict can be
passed in to define a distinct context for a given call to
`parse`.
"""
if not hasattr(f, "read"):
raise ParseError("Item to parse must be a file-like object.")
if not hasattr(f, "encoding") and not hasattr(f, "charbuffer"):
# someone still using a bytestream here?
f = codecs.getreader("utf-8")(f)
self.file = f # type: ignore[assignment]
self.buffer = ""
while True:
self.line = self.readline()
if self.line is None:
break
try:
self.parseline(bnode_context=bnode_context)
except ParseError:
raise ParseError("Invalid line: {}".format(self.line))
return self.sink
[docs] def parsestring(self, s: Union[bytes, bytearray, str], **kwargs) -> None:
"""Parse s as an N-Triples string."""
if not isinstance(s, (str, bytes, bytearray)):
raise ParseError("Item to parse must be a string instance.")
f: Union[codecs.StreamReader, StringIO]
if isinstance(s, (bytes, bytearray)):
f = codecs.getreader("utf-8")(BytesIO(s))
else:
f = StringIO(s)
self.parse(f, **kwargs)
[docs] def readline(self) -> Optional[str]:
"""Read an N-Triples line from buffered input."""
# N-Triples lines end in either CRLF, CR, or LF
# Therefore, we can't just use f.readline()
if not self.buffer:
# type error: Item "None" of "Union[TextIO, StreamReader, None]" has no attribute "read"
buffer = self.file.read(bufsiz) # type: ignore[union-attr]
if not buffer:
return None
self.buffer = buffer
while True:
m = r_line.match(self.buffer)
if m: # the more likely prospect
self.buffer = self.buffer[m.end() :]
return m.group(1)
else:
# type error: Item "None" of "Union[TextIO, StreamReader, None]" has no attribute "read"
buffer = self.file.read(bufsiz) # type: ignore[union-attr]
if not buffer and not self.buffer.isspace():
# Last line does not need to be terminated with a newline
buffer += "\n"
elif not buffer:
return None
self.buffer += buffer
[docs] def parseline(self, bnode_context: Optional[_BNodeContextType] = None) -> None:
self.eat(r_wspace)
if (not self.line) or self.line.startswith("#"):
return # The line is empty or a comment
subject = self.subject(bnode_context)
self.eat(r_wspaces)
predicate = self.predicate()
self.eat(r_wspaces)
object_ = self.object(bnode_context)
self.eat(r_tail)
if self.line:
raise ParseError("Trailing garbage: {}".format(self.line))
self.sink.triple(subject, predicate, object_)
[docs] def peek(self, token: str) -> bool:
return self.line.startswith(token) # type: ignore[union-attr]
[docs] def eat(self, pattern: Pattern[str]) -> Match[str]:
m = pattern.match(self.line) # type: ignore[arg-type]
if not m: # @@ Why can't we get the original pattern?
# print(dir(pattern))
# print repr(self.line), type(self.line)
raise ParseError("Failed to eat %s at %s" % (pattern.pattern, self.line))
self.line = self.line[m.end() :] # type: ignore[index]
return m
[docs] def subject(self, bnode_context=None) -> Union[bNode, URIRef]:
# @@ Consider using dictionary cases
subj = self.uriref() or self.nodeid(bnode_context)
if not subj:
raise ParseError("Subject must be uriref or nodeID")
return subj
[docs] def predicate(self) -> URIRef:
pred = self.uriref()
if not pred:
raise ParseError("Predicate must be uriref")
return pred
[docs] def object(
self, bnode_context: Optional[_BNodeContextType] = None
) -> Union[URI, bNode, Literal]:
objt = self.uriref() or self.nodeid(bnode_context) or self.literal()
if objt is False:
raise ParseError("Unrecognised object type")
return objt
[docs] def uriref(self) -> Union[te.Literal[False], URI]:
if self.peek("<"):
uri = self.eat(r_uriref).group(1)
uri = unquote(uri)
uri = uriquote(uri)
return URI(uri)
return False
[docs] def nodeid(
self, bnode_context: Optional[_BNodeContextType] = None
) -> Union[te.Literal[False], bNode]:
if self.peek("_"):
# Fix for https://github.com/RDFLib/rdflib/issues/204
if bnode_context is None:
bnode_context = self._bnode_ids
bnode_id = self.eat(r_nodeid).group(1)
new_id = bnode_context.get(bnode_id, None)
if new_id is not None:
# Re-map to id specific to this doc
return bNode(new_id)
else:
# Replace with freshly-generated document-specific BNode id
bnode = bNode()
# Store the mapping
bnode_context[bnode_id] = bnode
return bnode
return False
[docs] def literal(self) -> Union[te.Literal[False], Literal]:
if self.peek('"'):
lit, lang, dtype = self.eat(r_literal).groups()
if lang:
lang = lang
else:
lang = None
if dtype:
dtype = unquote(dtype)
dtype = uriquote(dtype)
dtype = URI(dtype)
else:
dtype = None
if lang and dtype:
raise ParseError("Can't have both a language and a datatype")
lit = unquote(lit)
return Literal(lit, lang, dtype)
return False
[docs]class NTGraphSink:
__slots__ = ("g",)
[docs] def __init__(self, graph: Graph):
self.g = graph
[docs] def triple(self, s: _SubjectType, p: _PredicateType, o: _ObjectType) -> None:
self.g.add((s, p, o))
[docs]class NTParser(Parser):
"""parser for the ntriples format, often stored with the .nt extension
See http://www.w3.org/TR/rdf-testcases/#ntriples"""
__slots__ = ()
[docs] @classmethod
def parse(cls, source: InputSource, sink: Graph, **kwargs: Any) -> None:
"""
Parse the NT format
:type source: `rdflib.parser.InputSource`
:param source: the source of NT-formatted data
:type sink: `rdflib.graph.Graph`
:param sink: where to send parsed triples
:param kwargs: Additional arguments to pass to `.W3CNTriplesParser.parse`
"""
f: Union[TextIO, IO[bytes], codecs.StreamReader]
f = source.getCharacterStream()
if not f:
b = source.getByteStream()
# TextIOBase includes: StringIO and TextIOWrapper
if isinstance(b, TextIOBase):
# f is not really a ByteStream, but a CharacterStream
f = b # type: ignore[assignment]
else:
# since N-Triples 1.1 files can and should be utf-8 encoded
f = codecs.getreader("utf-8")(b)
parser = W3CNTriplesParser(NTGraphSink(sink))
parser.parse(f, **kwargs)
f.close()