Source code for rdflib.plugins.parsers.ntriples

#!/usr/bin/env python
__doc__ = """
N-Triples Parser
License: GPL 2, W3C, BSD, or MIT
Author: Sean B. Palmer, inamidst.com
"""

import re
import codecs

from rdflib.term import URIRef as URI
from rdflib.term import BNode as bNode
from rdflib.term import Literal

from rdflib.py3compat import cast_bytes, decodeUnicodeEscape

__all__ = ['unquote', 'uriquote', 'Sink', 'NTriplesParser']

uriref = r'<([^:]+:[^\s"<>]+)>'
literal = r'"([^"\\]*(?:\\.[^"\\]*)*)"'
litinfo = r'(?:@([a-z]+(?:-[a-zA-Z0-9]+)*)|\^\^' + uriref + r')?'

r_line = re.compile(r'([^\r\n]*)(?:\r\n|\r|\n)')
r_wspace = re.compile(r'[ \t]*')
r_wspaces = re.compile(r'[ \t]+')
r_tail = re.compile(r'[ \t]*\.[ \t]*(#.*)?')
r_uriref = re.compile(uriref)
r_nodeid = re.compile(r'_:([A-Za-z0-9]*)')
r_literal = re.compile(literal + litinfo)

bufsiz = 2048
validate = False


class Node(unicode):
    pass


class ParseError(Exception):
    pass


[docs]class Sink(object):
[docs] def __init__(self): self.length = 0
[docs] def triple(self, s, p, o): self.length += 1 print (s, p, o)
quot = {'t': u'\t', 'n': u'\n', 'r': u'\r', '"': u'"', '\\': u'\\'} r_safe = re.compile(r'([\x20\x21\x23-\x5B\x5D-\x7E]+)') r_quot = re.compile(r'\\(t|n|r|"|\\)') r_uniquot = re.compile(r'\\u([0-9A-F]{4})|\\U([0-9A-F]{8})')
[docs]def unquote(s): """Unquote an N-Triples string.""" if not validate: if isinstance(s, unicode): # nquads s = decodeUnicodeEscape(s) else: s = s.decode('unicode-escape') return s else: result = [] while s: m = r_safe.match(s) if m: s = s[m.end():] result.append(m.group(1)) continue m = r_quot.match(s) if m: s = s[2:] result.append(quot[m.group(1)]) continue m = r_uniquot.match(s) if m: s = s[m.end():] u, U = m.groups() codepoint = int(u or U, 16) if codepoint > 0x10FFFF: raise ParseError("Disallowed codepoint: %08X" % codepoint) result.append(unichr(codepoint)) elif s.startswith('\\'): raise ParseError("Illegal escape at: %s..." % s[:10]) else: raise ParseError("Illegal literal character: %r" % s[0]) return u''.join(result)
r_hibyte = re.compile(ur'([\x80-\xFF])')
[docs]def uriquote(uri): if not validate: return uri else: return r_hibyte.sub( lambda m: '%%%02X' % ord(m.group(1)), uri)
[docs]class NTriplesParser(object): """An N-Triples Parser. Usage:: p = NTriplesParser(sink=MySink()) sink = p.parse(f) # file; use parsestring for a string """ _bnode_ids = {}
[docs] def __init__(self, sink=None): if sink is not None: self.sink = sink else: self.sink = Sink()
[docs] def parse(self, f): """Parse f as an N-Triples file.""" if not hasattr(f, 'read'): raise ParseError("Item to parse must be a file-like object.") # since N-Triples 1.1 files can and should be utf-8 encoded f = codecs.getreader('utf-8')(f) self.file = f self.buffer = '' while True: self.line = self.readline() if self.line is None: break try: self.parseline() except ParseError: raise ParseError("Invalid line: %r" % self.line) return self.sink
[docs] def parsestring(self, s): """Parse s as an N-Triples string.""" if not isinstance(s, basestring): raise ParseError("Item to parse must be a string instance.") try: from io import BytesIO assert BytesIO except ImportError: from cStringIO import StringIO as BytesIO assert BytesIO f = BytesIO() f.write(cast_bytes(s)) f.seek(0) self.parse(f)
[docs] def readline(self): """Read an N-Triples line from buffered input.""" # N-Triples lines end in either CRLF, CR, or LF # Therefore, we can't just use f.readline() if not self.buffer: buffer = self.file.read(bufsiz) if not buffer: return None self.buffer = buffer while True: m = r_line.match(self.buffer) if m: # the more likely prospect self.buffer = self.buffer[m.end():] return m.group(1) else: buffer = self.file.read(bufsiz) if not buffer and not self.buffer.isspace(): # Last line does not need to be terminated with a newline buffer += "\n" elif not buffer: return None self.buffer += buffer
[docs] def parseline(self): self.eat(r_wspace) if (not self.line) or self.line.startswith('#'): return # The line is empty or a comment subject = self.subject() self.eat(r_wspaces) predicate = self.predicate() self.eat(r_wspaces) object = self.object() self.eat(r_tail) if self.line: raise ParseError("Trailing garbage") self.sink.triple(subject, predicate, object)
[docs] def peek(self, token): return self.line.startswith(token)
[docs] def eat(self, pattern): m = pattern.match(self.line) if not m: # @@ Why can't we get the original pattern? # print(dir(pattern)) # print repr(self.line), type(self.line) raise ParseError("Failed to eat %s at %s" % (pattern.pattern, self.line)) self.line = self.line[m.end():] return m
[docs] def subject(self): # @@ Consider using dictionary cases subj = self.uriref() or self.nodeid() if not subj: raise ParseError("Subject must be uriref or nodeID") return subj
[docs] def predicate(self): pred = self.uriref() if not pred: raise ParseError("Predicate must be uriref") return pred
[docs] def object(self): objt = self.uriref() or self.nodeid() or self.literal() if objt is False: raise ParseError("Unrecognised object type") return objt
[docs] def uriref(self): if self.peek('<'): uri = self.eat(r_uriref).group(1) uri = unquote(uri) uri = uriquote(uri) return URI(uri) return False
[docs] def nodeid(self): if self.peek('_'): # Fix for https://github.com/RDFLib/rdflib/issues/204 bnode_id = self.eat(r_nodeid).group(1) new_id = self._bnode_ids.get(bnode_id, None) if new_id is not None: # Re-map to id specfic to this doc return bNode(new_id) else: # Replace with freshly-generated document-specific BNode id bnode = bNode() # Store the mapping self._bnode_ids[bnode_id] = bnode return bnode return False
[docs] def literal(self): if self.peek('"'): lit, lang, dtype = self.eat(r_literal).groups() if lang: lang = lang else: lang = None if dtype: dtype = dtype else: dtype = None if lang and dtype: raise ParseError("Can't have both a language and a datatype") lit = unquote(lit) return Literal(lit, lang, dtype) return False
# # Obsolete, unused # def parseURI(uri): # import urllib # parser = NTriplesParser() # u = urllib.urlopen(uri) # sink = parser.parse(u) # u.close() # # for triple in sink: # # print triple # print 'Length of input:', sink.length