#!/usr/bin/env python
u"""
notation3.py - Standalone Notation3 Parser
Derived from CWM, the Closed World Machine
Authors of the original suite:
* Dan Connolly <@@>
* Tim Berners-Lee <@@>
* Yosi Scharf <@@>
* Joseph M. Reagle Jr. <reagle@w3.org>
* Rich Salz <rsalz@zolera.com>
http://www.w3.org/2000/10/swap/notation3.py
Copyright 2000-2007, World Wide Web Consortium.
Copyright 2001, MIT.
Copyright 2001, Zolera Systems Inc.
License: W3C Software License
http://www.w3.org/Consortium/Legal/copyright-software
Modified by Sean B. Palmer
Copyright 2007, Sean B. Palmer.
Modified to work with rdflib by Gunnar Aastrand Grimnes
Copyright 2010, Gunnar A. Grimnes
"""
# Python standard libraries
import types
import sys
import os
import re
import codecs
import warnings
from decimal import Decimal
from uuid import uuid4
from rdflib.term import URIRef, BNode, Literal, Variable, _XSD_PFX, _unique_id
from rdflib.graph import QuotedGraph, ConjunctiveGraph, Graph
from rdflib import py3compat
b = py3compat.b
__all__ = ['BadSyntax', 'N3Parser', 'TurtleParser',
"splitFragP", "join", "base",
"runNamespace", "uniqueURI", "hexify"]
from rdflib.parser import Parser
[docs]def splitFragP(uriref, punct=0):
"""split a URI reference before the fragment
Punctuation is kept.
e.g.
>>> splitFragP("abc#def")
('abc', '#def')
>>> splitFragP("abcdef")
('abcdef', '')
"""
i = uriref.rfind("#")
if i >= 0:
return uriref[:i], uriref[i:]
else:
return uriref, ''
@py3compat.format_doctest_out
[docs]def join(here, there):
"""join an absolute URI and URI reference
(non-ascii characters are supported/doctested;
haven't checked the details of the IRI spec though)
``here`` is assumed to be absolute.
``there`` is URI reference.
>>> join('http://example/x/y/z', '../abc')
'http://example/x/abc'
Raise ValueError if there uses relative path
syntax but here has no hierarchical path.
>>> join('mid:foo@example', '../foo') # doctest: +NORMALIZE_WHITESPACE
Traceback (most recent call last):
raise ValueError(here)
ValueError: Base <mid:foo@example> has no slash
after colon - with relative '../foo'.
>>> join('http://example/x/y/z', '')
'http://example/x/y/z'
>>> join('mid:foo@example', '#foo')
'mid:foo@example#foo'
We grok IRIs
>>> len(%(u)s'Andr\\xe9')
5
>>> join('http://example.org/', %(u)s'#Andr\\xe9')
%(u)s'http://example.org/#Andr\\xe9'
"""
# assert(here.find("#") < 0), \
# "Base may not contain hash: '%s'" % here # why must caller splitFrag?
slashl = there.find('/')
colonl = there.find(':')
# join(base, 'foo:/') -- absolute
if colonl >= 0 and (slashl < 0 or colonl < slashl):
return there
bcolonl = here.find(':')
assert(bcolonl >= 0), \
"Base uri '%s' is not absolute" % here # else it's not absolute
path, frag = splitFragP(there)
if not path:
return here + frag
# join('mid:foo@example', '../foo') bzzt
if here[bcolonl + 1:bcolonl + 2] != '/':
raise ValueError(
("Base <%s> has no slash after "
"colon - with relative '%s'.") % (here, there))
if here[bcolonl + 1:bcolonl + 3] == '//':
bpath = here.find('/', bcolonl + 3)
else:
bpath = bcolonl + 1
# join('http://xyz', 'foo')
if bpath < 0:
bpath = len(here)
here = here + '/'
# join('http://xyz/', '//abc') => 'http://abc'
if there[:2] == '//':
return here[:bcolonl + 1] + there
# join('http://xyz/', '/abc') => 'http://xyz/abc'
if there[:1] == '/':
return here[:bpath] + there
slashr = here.rfind('/')
while 1:
if path[:2] == './':
path = path[2:]
if path == '.':
path = ''
elif path[:3] == '../' or path == '..':
path = path[3:]
i = here.rfind('/', bpath, slashr)
if i >= 0:
here = here[:i + 1]
slashr = i
else:
break
return here[:slashr + 1] + path + frag
[docs]def base():
"""The base URI for this process - the Web equiv of cwd
Relative or abolute unix-standard filenames parsed relative to
this yeild the URI of the file.
If we had a reliable way of getting a computer name,
we should put it in the hostname just to prevent ambiguity
"""
# return "file://" + hostname + os.getcwd() + "/"
return "file://" + _fixslash(os.getcwd()) + "/"
def _fixslash(s):
""" Fix windowslike filename to unixlike - (#ifdef WINDOWS)"""
s = s.replace("\\", "/")
if s[0] != "/" and s[1] == ":":
s = s[2:] # @@@ Hack when drive letter present
return s
CONTEXT = 0
PRED = 1
SUBJ = 2
OBJ = 3
PARTS = PRED, SUBJ, OBJ
ALL4 = CONTEXT, PRED, SUBJ, OBJ
SYMBOL = 0
FORMULA = 1
LITERAL = 2
LITERAL_DT = 21
LITERAL_LANG = 22
ANONYMOUS = 3
XMLLITERAL = 25
Logic_NS = "http://www.w3.org/2000/10/swap/log#"
NODE_MERGE_URI = Logic_NS + "is" # Pseudo-property indicating node merging
forSomeSym = Logic_NS + "forSome"
forAllSym = Logic_NS + "forAll"
RDF_type_URI = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type"
RDF_NS_URI = "http://www.w3.org/1999/02/22-rdf-syntax-ns#"
OWL_NS = "http://www.w3.org/2002/07/owl#"
DAML_sameAs_URI = OWL_NS + "sameAs"
parsesTo_URI = Logic_NS + "parsesTo"
RDF_spec = "http://www.w3.org/TR/REC-rdf-syntax/"
List_NS = RDF_NS_URI # From 20030808
_Old_Logic_NS = "http://www.w3.org/2000/10/swap/log.n3#"
N3_first = (SYMBOL, List_NS + "first")
N3_rest = (SYMBOL, List_NS + "rest")
N3_li = (SYMBOL, List_NS + "li")
N3_nil = (SYMBOL, List_NS + "nil")
N3_List = (SYMBOL, List_NS + "List")
N3_Empty = (SYMBOL, List_NS + "Empty")
runNamespaceValue = None
[docs]def runNamespace():
"Return a URI suitable as a namespace for run-local objects"
# @@@ include hostname (privacy?) (hash it?)
global runNamespaceValue
if runNamespaceValue is None:
runNamespaceValue = join(base(), _unique_id()) + '#'
return runNamespaceValue
nextu = 0
[docs]def uniqueURI():
"A unique URI"
global nextu
nextu += 1
# return runNamespace() + "u_" + `nextu`
return runNamespace() + "u_" + str(nextu)
tracking = False
chatty_flag = 50
# from why import BecauseOfData, becauseSubexpression
def BecauseOfData(*args, **kargs):
# print args, kargs
pass
def becauseSubexpression(*args, **kargs):
# print args, kargs
pass
N3_forSome_URI = forSomeSym
N3_forAll_URI = forAllSym
# Magic resources we know about
ADDED_HASH = "#" # Stop where we use this in case we want to remove it!
# This is the hash on namespace URIs
RDF_type = (SYMBOL, RDF_type_URI)
DAML_sameAs = (SYMBOL, DAML_sameAs_URI)
LOG_implies_URI = "http://www.w3.org/2000/10/swap/log#implies"
BOOLEAN_DATATYPE = _XSD_PFX + "boolean"
DECIMAL_DATATYPE = _XSD_PFX + "decimal"
DOUBLE_DATATYPE = _XSD_PFX + "double"
FLOAT_DATATYPE = _XSD_PFX + "float"
INTEGER_DATATYPE = _XSD_PFX + "integer"
option_noregen = 0 # If set, do not regenerate genids on output
# @@ I18n - the notname chars need extending for well known unicode non-text
# characters. The XML spec switched to assuming unknown things were name
# characaters.
# _namechars = string.lowercase + string.uppercase + string.digits + '_-'
_notQNameChars = \
"\t\r\n !\"#$&'()*,+/;<=>?@[\\]^`{|}~" # else valid qname :-/
_notKeywordsChars = _notQNameChars + "."
_notNameChars = _notQNameChars + ":" # Assume anything else valid name :-/
_rdfns = 'http://www.w3.org/1999/02/22-rdf-syntax-ns#'
hexChars = 'ABCDEFabcdef0123456789'
escapeChars = "(_~.-!$&'()*+,;=/?#@%)" # valid for \ escapes in localnames
def unicodeExpand(m):
try:
return unichr(int(m.group(1), 16))
except:
raise Exception("Invalid unicode code point: " + m.group(1))
if py3compat.narrow_build:
def unicodeExpand(m):
try:
return unichr(int(m.group(1), 16))
except ValueError:
warnings.warn(
'Encountered a unicode char > 0xFFFF in a narrow python build. '
'Trying to degrade gracefully, but this can cause problems '
'later when working with the string:\n%s' % m.group(0))
return codecs.decode(m.group(0), 'unicode_escape')
unicodeEscape4 = re.compile(
r'\\u([0-9a-fA-F]{4})')
unicodeEscape8 = re.compile(
r'\\U([0-9a-fA-F]{8})')
N3CommentCharacter = "#" # For unix script # ! compatabilty
########################################## Parse string to sink
#
# Regular expressions:
eol = re.compile(
r'[ \t]*(#[^\n]*)?\r?\n') # end of line, poss. w/comment
eof = re.compile(
r'[ \t]*(#[^\n]*)?$') # end of file, poss. w/comment
ws = re.compile(r'[ \t]*') # Whitespace not including NL
signed_integer = re.compile(r'[-+]?[0-9]+') # integer
integer_syntax = re.compile(r'[-+]?[0-9]+')
decimal_syntax = re.compile(r'[-+]?[0-9]*\.[0-9]+')
exponent_syntax = re.compile(r'[-+]?(?:[0-9]+\.[0-9]*(?:e|E)[-+]?[0-9]+|'+
r'\.[0-9](?:e|E)[-+]?[0-9]+|'+
r'[0-9]+(?:e|E)[-+]?[0-9]+)')
digitstring = re.compile(r'[0-9]+') # Unsigned integer
interesting = re.compile(r"""[\\\r\n\"\']""")
langcode = re.compile(r'[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*')
class SinkParser:
def __init__(self, store, openFormula=None, thisDoc="", baseURI=None,
genPrefix="", why=None, turtle=False):
""" note: namespace names should *not* end in # ;
the # will get added during qname processing """
self._bindings = {}
if thisDoc != "":
assert ':' in thisDoc, "Document URI not absolute: <%s>" % thisDoc
self._bindings[""] = thisDoc + "#" # default
self._store = store
if genPrefix:
store.setGenPrefix(genPrefix) # pass it on
self._thisDoc = thisDoc
self.lines = 0 # for error handling
self.startOfLine = 0 # For calculating character number
self._genPrefix = genPrefix
self.keywords = ['a', 'this', 'bind', 'has', 'is', 'of',
'true', 'false']
self.keywordsSet = 0 # Then only can others be considerd qnames
self._anonymousNodes = {}
# Dict of anon nodes already declared ln: Term
self._variables = {}
self._parentVariables = {}
self._reason = why # Why the parser was asked to parse this
self.turtle = turtle # raise exception when encountering N3 extensions
# Turtle allows single or double quotes around strings, whereas N3
# only allows double quotes.
self.string_delimiters = ('"', "'") if turtle else ('"',)
self._reason2 = None # Why these triples
# was: diag.tracking
if tracking:
self._reason2 = BecauseOfData(
store.newSymbol(thisDoc), because=self._reason)
if baseURI:
self._baseURI = baseURI
else:
if thisDoc:
self._baseURI = thisDoc
else:
self._baseURI = None
assert not self._baseURI or ':' in self._baseURI
if not self._genPrefix:
if self._thisDoc:
self._genPrefix = self._thisDoc + "#_g"
else:
self._genPrefix = uniqueURI()
if openFormula is None:
if self._thisDoc:
self._formula = store.newFormula(thisDoc + "#_formula")
else:
self._formula = store.newFormula()
else:
self._formula = openFormula
self._context = self._formula
self._parentContext = None
def here(self, i):
"""String generated from position in file
This is for repeatability when refering people to bnodes in a document.
This has diagnostic uses less formally, as it should point one to which
bnode the arbitrary identifier actually is. It gives the
line and character number of the '[' charcacter or path character
which introduced the blank node. The first blank node is boringly
_L1C1. It used to be used only for tracking, but for tests in general
it makes the canonical ordering of bnodes repeatable."""
return "%s_L%iC%i" % (self._genPrefix, self.lines,
i - self.startOfLine + 1)
def formula(self):
return self._formula
def loadStream(self, stream):
return self.loadBuf(stream.read()) # Not ideal
def loadBuf(self, buf):
"""Parses a buffer and returns its top level formula"""
self.startDoc()
self.feed(buf)
return self.endDoc() # self._formula
def feed(self, octets):
"""Feed an octet stream tothe parser
if BadSyntax is raised, the string
passed in the exception object is the
remainder after any statements have been parsed.
So if there is more data to feed to the
parser, it should be straightforward to recover."""
if not isinstance(octets, unicode):
s = octets.decode('utf-8')
# NB already decoded, so \ufeff
if len(s) > 0 and s[0] == codecs.BOM_UTF8.decode('utf-8'):
s = s[1:]
else:
s = octets
i = 0
while i >= 0:
j = self.skipSpace(s, i)
if j < 0:
return
i = self.directiveOrStatement(s, j)
if i < 0:
#print("# next char: %s" % s[j])
self.BadSyntax(s, j,
"expected directive or statement")
def directiveOrStatement(self, argstr, h):
i = self.skipSpace(argstr, h)
if i < 0:
return i # EOF
if self.turtle:
j = self.sparqlDirective(argstr, i)
if j >= 0:
return j
j = self.directive(argstr, i)
if j >= 0:
return self.checkDot(argstr, j)
j = self.statement(argstr, i)
if j >= 0:
return self.checkDot(argstr, j)
return j
# @@I18N
# _namechars = string.lowercase + string.uppercase + string.digits + '_-'
def tok(self, tok, argstr, i, colon=False):
"""Check for keyword. Space must have been stripped on entry and
we must not be at end of file.
if colon, then keyword followed by colon is ok
(@prefix:<blah> is ok, rdf:type shortcut a must be followed by ws)
"""
assert tok[0] not in _notNameChars # not for punctuation
if argstr[i:i + 1] == "@":
i = i + 1
else:
if tok not in self.keywords:
return -1 # No, this has neither keywords declaration nor "@"
if (argstr[i:i + len(tok)] == tok
and ( argstr[i + len(tok)] in _notKeywordsChars)
or (colon and argstr[i+len(tok)] == ':')):
i = i + len(tok)
return i
else:
return -1
def sparqlTok(self, tok, argstr, i):
"""Check for SPARQL keyword. Space must have been stripped on entry
and we must not be at end of file.
Case insensitive and not preceeded by @
"""
assert tok[0] not in _notNameChars # not for punctuation
if (argstr[i:i + len(tok)].lower() == tok.lower()
and (argstr[i + len(tok)] in _notQNameChars)):
i = i + len(tok)
return i
else:
return -1
def directive(self, argstr, i):
j = self.skipSpace(argstr, i)
if j < 0:
return j # eof
res = []
j = self.tok('bind', argstr, i) # implied "#". Obsolete.
if j > 0:
self.BadSyntax(argstr, i,
"keyword bind is obsolete: use @prefix")
j = self.tok('keywords', argstr, i)
if j > 0:
if self.turtle:
self.BadSyntax(argstr, i, "Found 'keywords' when in Turtle mode.")
i = self.commaSeparatedList(argstr, j, res, self.bareWord)
if i < 0:
self.BadSyntax(argstr, i,
"'@keywords' needs comma separated list of words")
self.setKeywords(res[:])
return i
j = self.tok('forAll', argstr, i)
if j > 0:
if self.turtle:
self.BadSyntax(argstr, i, "Found 'forAll' when in Turtle mode.")
i = self.commaSeparatedList(argstr, j, res, self.uri_ref2)
if i < 0:
self.BadSyntax(argstr, i,
"Bad variable list after @forAll")
for x in res:
# self._context.declareUniversal(x)
if x not in self._variables or x in self._parentVariables:
self._variables[x] = self._context.newUniversal(x)
return i
j = self.tok('forSome', argstr, i)
if j > 0:
if self.turtle:
self.BadSyntax(argstr, i, "Found 'forSome' when in Turtle mode.")
i = self. commaSeparatedList(argstr, j, res, self.uri_ref2)
if i < 0:
self.BadSyntax(argstr, i,
"Bad variable list after @forSome")
for x in res:
self._context.declareExistential(x)
return i
j = self.tok('prefix', argstr, i, colon=True) # no implied "#"
if j >= 0:
t = []
i = self.qname(argstr, j, t)
if i < 0:
self.BadSyntax(argstr, j,
"expected qname after @prefix")
j = self.uri_ref2(argstr, i, t)
if j < 0:
self.BadSyntax(argstr, i,
"expected <uriref> after @prefix _qname_")
ns = self.uriOf(t[1])
if self._baseURI:
ns = join(self._baseURI, ns)
elif ":" not in ns:
self.BadSyntax(argstr, j,
"With no base URI, cannot use " +
"relative URI in @prefix <" + ns + ">")
assert ':' in ns # must be absolute
self._bindings[t[0][0]] = ns
self.bind(t[0][0], hexify(ns))
return j
j = self.tok('base', argstr, i) # Added 2007/7/7
if j >= 0:
t = []
i = self.uri_ref2(argstr, j, t)
if i < 0:
self.BadSyntax(argstr, j,
"expected <uri> after @base ")
ns = self.uriOf(t[0])
if self._baseURI:
ns = join(self._baseURI, ns)
else:
self.BadSyntax(argstr, j,
"With no previous base URI, cannot use " +
"relative URI in @base <" + ns + ">")
assert ':' in ns # must be absolute
self._baseURI = ns
return i
return -1 # Not a directive, could be something else.
def sparqlDirective(self, argstr, i):
"""
turtle and trig support BASE/PREFIX without @ and without
terminating .
"""
j = self.skipSpace(argstr, i)
if j < 0:
return j # eof
j = self.sparqlTok('PREFIX', argstr, i)
if j >= 0:
t = []
i = self.qname(argstr, j, t)
if i < 0:
self.BadSyntax(argstr, j,
"expected qname after @prefix")
j = self.uri_ref2(argstr, i, t)
if j < 0:
self.BadSyntax(argstr, i,
"expected <uriref> after @prefix _qname_")
ns = self.uriOf(t[1])
if self._baseURI:
ns = join(self._baseURI, ns)
elif ":" not in ns:
self.BadSyntax(argstr, j,
"With no base URI, cannot use " +
"relative URI in @prefix <" + ns + ">")
assert ':' in ns # must be absolute
self._bindings[t[0][0]] = ns
self.bind(t[0][0], hexify(ns))
return j
j = self.sparqlTok('BASE', argstr, i)
if j >= 0:
t = []
i = self.uri_ref2(argstr, j, t)
if i < 0:
self.BadSyntax(argstr, j,
"expected <uri> after @base ")
ns = self.uriOf(t[0])
if self._baseURI:
ns = join(self._baseURI, ns)
else:
self.BadSyntax(argstr, j,
"With no previous base URI, cannot use " +
"relative URI in @base <" + ns + ">")
assert ':' in ns # must be absolute
self._baseURI = ns
return i
return -1 # Not a directive, could be something else.
def bind(self, qn, uri):
assert isinstance(
uri, types.StringType), "Any unicode must be %x-encoded already"
if qn == "":
self._store.setDefaultNamespace(uri)
else:
self._store.bind(qn, uri)
def setKeywords(self, k):
"Takes a list of strings"
if k is None:
self.keywordsSet = 0
else:
self.keywords = k
self.keywordsSet = 1
def startDoc(self):
# was: self._store.startDoc()
self._store.startDoc(self._formula)
def endDoc(self):
"""Signal end of document and stop parsing. returns formula"""
self._store.endDoc(self._formula) # don't canonicalize yet
return self._formula
def makeStatement(self, quadruple):
# $$$$$$$$$$$$$$$$$$$$$
# print "# Parser output: ", `quadruple`
self._store.makeStatement(quadruple, why=self._reason2)
def statement(self, argstr, i):
r = []
i = self.object(
argstr, i, r) # Allow literal for subject - extends RDF
if i < 0:
return i
j = self.property_list(argstr, i, r[0])
if j < 0:
self.BadSyntax(
argstr, i, "expected propertylist")
return j
def subject(self, argstr, i, res):
return self.item(argstr, i, res)
def verb(self, argstr, i, res):
""" has _prop_
is _prop_ of
a
=
_prop_
>- prop ->
<- prop -<
_operator_"""
j = self.skipSpace(argstr, i)
if j < 0:
return j # eof
r = []
j = self.tok('has', argstr, i)
if j >= 0:
if self.turtle:
self.BadSyntax(argstr, i, "Found 'has' keyword in Turtle mode")
i = self.prop(argstr, j, r)
if i < 0:
self.BadSyntax(argstr, j,
"expected property after 'has'")
res.append(('->', r[0]))
return i
j = self.tok('is', argstr, i)
if j >= 0:
if self.turtle:
self.BadSyntax(argstr, i, "Found 'is' keyword in Turtle mode")
i = self.prop(argstr, j, r)
if i < 0:
self.BadSyntax(argstr, j,
"expected <property> after 'is'")
j = self.skipSpace(argstr, i)
if j < 0:
self.BadSyntax(argstr, i,
"End of file found, expected property after 'is'")
i = j
j = self.tok('of', argstr, i)
if j < 0:
self.BadSyntax(argstr, i,
"expected 'of' after 'is' <prop>")
res.append(('<-', r[0]))
return j
j = self.tok('a', argstr, i)
if j >= 0:
res.append(('->', RDF_type))
return j
if argstr[i:i + 2] == "<=":
if self.turtle:
self.BadSyntax(argstr, i,
"Found '<=' in Turtle mode. ")
res.append(('<-', self._store.newSymbol(Logic_NS + "implies")))
return i + 2
if argstr[i:i + 1] == "=":
if self.turtle:
self.BadSyntax(argstr, i, "Found '=' in Turtle mode")
if argstr[i + 1:i + 2] == ">":
res.append(('->', self._store.newSymbol(Logic_NS + "implies")))
return i + 2
res.append(('->', DAML_sameAs))
return i + 1
if argstr[i:i + 2] == ":=":
if self.turtle:
self.BadSyntax(argstr, i, "Found ':=' in Turtle mode")
# patch file relates two formulae, uses this @@ really?
res.append(('->', Logic_NS + "becomes"))
return i + 2
j = self.prop(argstr, i, r)
if j >= 0:
res.append(('->', r[0]))
return j
if argstr[i:i + 2] == ">-" or argstr[i:i + 2] == "<-":
self.BadSyntax(argstr, j,
">- ... -> syntax is obsolete.")
return -1
def prop(self, argstr, i, res):
return self.item(argstr, i, res)
def item(self, argstr, i, res):
return self.path(argstr, i, res)
def blankNode(self, uri=None):
return self._store.newBlankNode(self._context, uri, why=self._reason2)
def path(self, argstr, i, res):
"""Parse the path production.
"""
j = self.nodeOrLiteral(argstr, i, res)
if j < 0:
return j # nope
while argstr[j:j + 1] in "!^": # no spaces, must follow exactly (?)
ch = argstr[j:j + 1]
subj = res.pop()
obj = self.blankNode(uri=self.here(j))
j = self.node(argstr, j + 1, res)
if j < 0:
self.BadSyntax(argstr, j,
"EOF found in middle of path syntax")
pred = res.pop()
if ch == "^": # Reverse traverse
self.makeStatement((self._context, pred, obj, subj))
else:
self.makeStatement((self._context, pred, subj, obj))
res.append(obj)
return j
def anonymousNode(self, ln):
"""Remember or generate a term for one of these _: anonymous nodes"""
term = self._anonymousNodes.get(ln, None)
if term is not None:
return term
term = self._store.newBlankNode(self._context, why=self._reason2)
self._anonymousNodes[ln] = term
return term
def node(self, argstr, i, res, subjectAlready=None):
"""Parse the <node> production.
Space is now skipped once at the beginning
instead of in multipe calls to self.skipSpace().
"""
subj = subjectAlready
j = self.skipSpace(argstr, i)
if j < 0:
return j # eof
i = j
ch = argstr[i:i + 1] # Quick 1-character checks first:
if ch == "[":
bnodeID = self.here(i)
j = self.skipSpace(argstr, i + 1)
if j < 0:
self.BadSyntax(argstr, i,
"EOF after '['")
# Hack for "is" binding name to anon node
if argstr[j:j + 1] == "=":
if self.turtle:
self.BadSyntax(argstr, j, "Found '[=' or '[ =' when in turtle mode.")
i = j + 1
objs = []
j = self.objectList(argstr, i, objs)
if j >= 0:
subj = objs[0]
if len(objs) > 1:
for obj in objs:
self.makeStatement((self._context,
DAML_sameAs, subj, obj))
j = self.skipSpace(argstr, j)
if j < 0:
self.BadSyntax(argstr, i,
"EOF when objectList expected after [ = ")
if argstr[j:j + 1] == ";":
j = j + 1
else:
self.BadSyntax(argstr, i,
"objectList expected after [= ")
if subj is None:
subj = self.blankNode(uri=bnodeID)
i = self.property_list(argstr, j, subj)
if i < 0:
self.BadSyntax(argstr, j,
"property_list expected")
j = self.skipSpace(argstr, i)
if j < 0:
self.BadSyntax(argstr, i,
"EOF when ']' expected after [ <propertyList>")
if argstr[j:j + 1] != "]":
self.BadSyntax(argstr, j,
"']' expected")
res.append(subj)
return j + 1
if not self.turtle and ch == "{":
# if self.turtle:
# self.BadSyntax(argstr, i,
# "found '{' while in Turtle mode, Formulas not supported!")
ch2 = argstr[i + 1:i + 2]
if ch2 == '$':
# a set
i += 1
j = i + 1
List = []
first_run = True
while 1:
i = self.skipSpace(argstr, j)
if i < 0:
self.BadSyntax(argstr, i,
"needed '$}', found end.")
if argstr[i:i + 2] == '$}':
j = i + 2
break
if not first_run:
if argstr[i:i + 1] == ',':
i += 1
else:
self.BadSyntax(
argstr, i, "expected: ','")
else:
first_run = False
item = []
j = self.item(
argstr, i, item) # @@@@@ should be path, was object
if j < 0:
self.BadSyntax(argstr, i,
"expected item in set or '$}'")
List.append(self._store.intern(item[0]))
res.append(self._store.newSet(List, self._context))
return j
else:
# parse a formula
j = i + 1
oldParentContext = self._parentContext
self._parentContext = self._context
parentAnonymousNodes = self._anonymousNodes
grandParentVariables = self._parentVariables
self._parentVariables = self._variables
self._anonymousNodes = {}
self._variables = self._variables.copy()
reason2 = self._reason2
self._reason2 = becauseSubexpression
if subj is None:
subj = self._store.newFormula()
self._context = subj
while 1:
i = self.skipSpace(argstr, j)
if i < 0:
self.BadSyntax(
argstr, i, "needed '}', found end.")
if argstr[i:i + 1] == "}":
j = i + 1
break
j = self.directiveOrStatement(argstr, i)
if j < 0:
self.BadSyntax(
argstr, i, "expected statement or '}'")
self._anonymousNodes = parentAnonymousNodes
self._variables = self._parentVariables
self._parentVariables = grandParentVariables
self._context = self._parentContext
self._reason2 = reason2
self._parentContext = oldParentContext
res.append(subj.close()) # No use until closed
return j
if ch == "(":
thing_type = self._store.newList
ch2 = argstr[i + 1:i + 2]
if ch2 == '$':
thing_type = self._store.newSet
i += 1
j = i + 1
List = []
while 1:
i = self.skipSpace(argstr, j)
if i < 0:
self.BadSyntax(
argstr, i, "needed ')', found end.")
if argstr[i:i + 1] == ')':
j = i + 1
break
item = []
j = self.item(
argstr, i, item) # @@@@@ should be path, was object
if j < 0:
self.BadSyntax(argstr, i,
"expected item in list or ')'")
List.append(self._store.intern(item[0]))
res.append(thing_type(List, self._context))
return j
j = self.tok('this', argstr, i) # This context
if j >= 0:
self.BadSyntax(argstr, i,
"Keyword 'this' was ancient N3. Now use " +
"@forSome and @forAll keywords.")
# booleans
j = self.tok('true', argstr, i)
if j >= 0:
res.append(True)
return j
j = self.tok('false', argstr, i)
if j >= 0:
res.append(False)
return j
if subj is None: # If this can be a named node, then check for a name.
j = self.uri_ref2(argstr, i, res)
if j >= 0:
return j
return -1
def property_list(self, argstr, i, subj):
"""Parse property list
Leaves the terminating punctuation in the buffer
"""
while 1:
while 1: # skip repeat ;
j = self.skipSpace(argstr, i)
if j < 0:
self.BadSyntax(argstr, i,
"EOF found when expected verb in property list")
if argstr[j]!=';': break
i = j+1
if argstr[j:j + 2] == ":-":
if self.turtle:
self.BadSyntax(argstr, j, "Found in ':-' in Turtle mode")
i = j + 2
res = []
j = self.node(argstr, i, res, subj)
if j < 0:
self.BadSyntax(argstr, i,
"bad {} or () or [] node after :- ")
i = j
continue
i = j
v = []
j = self.verb(argstr, i, v)
if j <= 0:
return i # void but valid
objs = []
i = self.objectList(argstr, j, objs)
if i < 0:
self.BadSyntax(argstr, j,
"objectList expected")
for obj in objs:
dira, sym = v[0]
if dira == '->':
self.makeStatement((self._context, sym, subj, obj))
else:
self.makeStatement((self._context, sym, obj, subj))
j = self.skipSpace(argstr, i)
if j < 0:
self.BadSyntax(argstr, j,
"EOF found in list of objects")
if argstr[i:i + 1] != ";":
return i
i = i + 1 # skip semicolon and continue
def commaSeparatedList(self, argstr, j, res, what):
"""return value: -1 bad syntax; >1 new position in argstr
res has things found appended
"""
i = self.skipSpace(argstr, j)
if i < 0:
self.BadSyntax(argstr, i,
"EOF found expecting comma sep list")
if argstr[i] == ".":
return j # empty list is OK
i = what(argstr, i, res)
if i < 0:
return -1
while 1:
j = self.skipSpace(argstr, i)
if j < 0:
return j # eof
ch = argstr[j:j + 1]
if ch != ",":
if ch != ".":
return -1
return j # Found but not swallowed "."
i = what(argstr, j + 1, res)
if i < 0:
self.BadSyntax(argstr, i,
"bad list content")
def objectList(self, argstr, i, res):
i = self.object(argstr, i, res)
if i < 0:
return -1
while 1:
j = self.skipSpace(argstr, i)
if j < 0:
self.BadSyntax(argstr, j,
"EOF found after object")
if argstr[j:j + 1] != ",":
return j # Found something else!
i = self.object(argstr, j + 1, res)
if i < 0:
return i
def checkDot(self, argstr, i):
j = self.skipSpace(argstr, i)
if j < 0:
return j # eof
if argstr[j:j + 1] == ".":
return j + 1 # skip
if argstr[j:j + 1] == "}":
return j # don't skip it
if argstr[j:j + 1] == "]":
return j
self.BadSyntax(argstr, j,
"expected '.' or '}' or ']' at end of statement")
def uri_ref2(self, argstr, i, res):
"""Generate uri from n3 representation.
Note that the RDF convention of directly concatenating
NS and local name is now used though I prefer inserting a '#'
to make the namesapces look more like what XML folks expect.
"""
qn = []
j = self.qname(argstr, i, qn)
if j >= 0:
pfx, ln = qn[0]
if pfx is None:
assert 0, "not used?"
ns = self._baseURI + ADDED_HASH
else:
try:
ns = self._bindings[pfx]
except KeyError:
if pfx == "_": # Magic prefix 2001/05/30, can be changed
res.append(self.anonymousNode(ln))
return j
if not self.turtle and pfx == "":
ns = join(self._baseURI or "", "#")
else:
self.BadSyntax(argstr, i,
"Prefix \"%s:\" not bound" % (pfx))
symb = self._store.newSymbol(ns + ln)
if symb in self._variables:
res.append(self._variables[symb])
else:
res.append(symb) # @@@ "#" CONVENTION
return j
i = self.skipSpace(argstr, i)
if i < 0:
return -1
if argstr[i] == "?":
v = []
j = self.variable(argstr, i, v)
if j > 0: # Forget varibles as a class, only in context.
res.append(v[0])
return j
return -1
elif argstr[i] == "<":
i = i + 1
st = i
while i < len(argstr):
if argstr[i] == ">":
uref = argstr[st:i] # the join should dealt with "":
# expand unicode escapes
uref = unicodeEscape8.sub(unicodeExpand, uref)
uref = unicodeEscape4.sub(unicodeExpand, uref)
if self._baseURI:
uref = join(self._baseURI, uref) # was: uripath.join
else:
assert ":" in uref, \
"With no base URI, cannot deal with relative URIs"
if argstr[i - 1:i] == "#" and not uref[-1:] == "#":
uref = uref + \
"#" # She meant it! Weirdness in urlparse?
symb = self._store.newSymbol(uref)
if symb in self._variables:
res.append(self._variables[symb])
else:
res.append(symb)
return i + 1
i = i + 1
self.BadSyntax(argstr, j,
"unterminated URI reference")
elif self.keywordsSet:
v = []
j = self.bareWord(argstr, i, v)
if j < 0:
return -1 # Forget varibles as a class, only in context.
if v[0] in self.keywords:
self.BadSyntax(argstr, i,
'Keyword "%s" not allowed here.' % v[0])
res.append(self._store.newSymbol(self._bindings[""] + v[0]))
return j
else:
return -1
def skipSpace(self, argstr, i):
"""Skip white space, newlines and comments.
return -1 if EOF, else position of first non-ws character"""
while 1:
m = eol.match(argstr, i)
if m is None:
break
self.lines = self.lines + 1
i = m.end() # Point to first character unmatched
self.startOfLine = i
m = ws.match(argstr, i)
if m is not None:
i = m.end()
m = eof.match(argstr, i)
if m is not None:
return -1
return i
def variable(self, argstr, i, res):
""" ?abc -> variable(:abc)
"""
j = self.skipSpace(argstr, i)
if j < 0:
return -1
if argstr[j:j + 1] != "?":
return -1
j = j + 1
i = j
if argstr[j] in "0123456789-":
self.BadSyntax(argstr, j,
"Varible name can't start with '%s'" % argstr[j])
while i < len(argstr) and argstr[i] not in _notKeywordsChars:
i = i + 1
if self._parentContext is None:
varURI = self._store.newSymbol(self._baseURI + "#" + argstr[j:i])
if varURI not in self._variables:
self._variables[varURI] = self._context.newUniversal(
varURI, why=self._reason2)
res.append(self._variables[varURI])
return i
# @@ was:
# self.BadSyntax(argstr, j,
# "Can't use ?xxx syntax for variable in outermost level: %s"
# % argstr[j-1:i])
varURI = self._store.newSymbol(self._baseURI + "#" + argstr[j:i])
if varURI not in self._parentVariables:
self._parentVariables[varURI] = self._parentContext.newUniversal(
varURI, why=self._reason2)
res.append(self._parentVariables[varURI])
return i
def bareWord(self, argstr, i, res):
""" abc -> :abc
"""
j = self.skipSpace(argstr, i)
if j < 0:
return -1
if argstr[j] in "0123456789-" or argstr[j] in _notKeywordsChars:
return -1
i = j
while i < len(argstr) and argstr[i] not in _notKeywordsChars:
i = i + 1
res.append(argstr[j:i])
return i
def qname(self, argstr, i, res):
"""
xyz:def -> ('xyz', 'def')
If not in keywords and keywordsSet: def -> ('', 'def')
:def -> ('', 'def')
"""
i = self.skipSpace(argstr, i)
if i < 0:
return -1
c = argstr[i]
if c in "0123456789-+.":
return -1
if c not in _notNameChars:
ln = c
i = i + 1
while i < len(argstr):
c = argstr[i]
if c not in _notNameChars:
ln = ln + c
i = i + 1
else:
break
if argstr[i - 1] == ".": # qname cannot end with "."
ln = ln[:-1]
if not ln: return -1
i -= 1
else: # First character is non-alpha
ln = '' # Was: None - TBL (why? useful?)
if i < len(argstr) and argstr[i] == ':':
pfx = ln
# bnodes names have different rules
if pfx == '_':
allowedChars = _notNameChars
else:
allowedChars = _notQNameChars
i = i + 1
lastslash = False
# start = i # TODO first char .
ln = ''
while i < len(argstr):
c = argstr[i]
if not lastslash and c == '\\':
lastslash = True
i += 1
elif lastslash or c not in allowedChars:
if lastslash:
if c not in escapeChars:
raise BadSyntax(self._thisDoc, self.line, argstr, i,
"illegal escape "+c)
elif c=='%':
if argstr[i+1] not in hexChars or argstr[i+2] not in hexChars:
raise BadSyntax(self._thisDoc, self.line, argstr, i,
"illegal hex escape "+c)
ln = ln + c
i = i + 1
lastslash = False
else:
break
if lastslash:
raise BadSyntax(
self._thisDoc, self.line, argstr, i,
"qname cannot end with \\")
if argstr[i-1]=='.':
# localname cannot end in .
ln = ln[:-1]
if not ln: return -1
i -= 1
res.append((pfx, ln))
return i
else: # delimiter was not ":"
if ln and self.keywordsSet and ln not in self.keywords:
res.append(('', ln))
return i
return -1
def object(self, argstr, i, res):
j = self.subject(argstr, i, res)
if j >= 0:
return j
else:
j = self.skipSpace(argstr, i)
if j < 0:
return -1
else:
i = j
if argstr[i] in self.string_delimiters:
if argstr[i:i + 3] == argstr[i] * 3:
delim = argstr[i] * 3
else:
delim = argstr[i]
i = i + len(delim)
j, s = self.strconst(argstr, i, delim)
res.append(self._store.newLiteral(s))
return j
else:
return -1
def nodeOrLiteral(self, argstr, i, res):
j = self.node(argstr, i, res)
startline = self.lines # Remember where for error messages
if j >= 0:
return j
else:
j = self.skipSpace(argstr, i)
if j < 0:
return -1
else:
i = j
ch = argstr[i]
if ch in "-+0987654321.":
m = exponent_syntax.match(argstr, i)
if m:
j = m.end()
res.append(float(argstr[i:j]))
return j
m = decimal_syntax.match(argstr, i)
if m:
j = m.end()
res.append(Decimal(argstr[i:j]))
return j
m = integer_syntax.match(argstr, i)
if m:
j = m.end()
res.append(long(argstr[i:j]))
return j
# return -1 ## or fall through?
if argstr[i] in self.string_delimiters:
if argstr[i:i + 3] == argstr[i] * 3:
delim = argstr[i] * 3
else:
delim = argstr[i]
i = i + len(delim)
dt = None
j, s = self.strconst(argstr, i, delim)
lang = None
if argstr[j:j + 1] == "@": # Language?
m = langcode.match(argstr, j + 1)
if m is None:
raise BadSyntax(
self._thisDoc, startline, argstr, i,
"Bad language code syntax on string " +
"literal, after @")
i = m.end()
lang = argstr[j + 1:i]
j = i
if argstr[j:j + 2] == "^^":
res2 = []
j = self.uri_ref2(argstr, j + 2, res2) # Read datatype URI
dt = res2[0]
res.append(self._store.newLiteral(s, dt, lang))
return j
else:
return -1
def uriOf(self, sym):
if isinstance(sym, types.TupleType):
return sym[1] # old system for --pipe
# return sym.uriref() # cwm api
return sym
def strconst(self, argstr, i, delim):
"""parse an N3 string constant delimited by delim.
return index, val
"""
delim1 = delim[0]
delim2, delim3, delim4, delim5 = delim1 * 2, delim1 * 3, delim1 * 4, delim1 * 5
j = i
ustr = u"" # Empty unicode string
startline = self.lines # Remember where for error messages
while j < len(argstr):
if argstr[j] == delim1:
if delim == delim1: # done when delim is " or '
i = j + 1
return i, ustr
if delim == delim3: # done when delim is """ or ''' and, respectively ...
if argstr[j:j + 5] == delim5: # ... we have "" or '' before
i = j + 5
ustr = ustr + delim2
return i, ustr
if argstr[j:j + 4] == delim4: # ... we have " or ' before
i = j + 4
ustr = ustr + delim1
return i, ustr
if argstr[j:j + 3] == delim3: # current " or ' is part of delim
i = j + 3
return i, ustr
# we are inside of the string and current char is " or '
j = j + 1
ustr = ustr + delim1
continue
m = interesting.search(argstr, j) # was argstr[j:].
# Note for pos param to work, MUST be compiled ... re bug?
assert m, "Quote expected in string at ^ in %s^%s" % (
argstr[j - 20:j], argstr[j:j + 20]) # at least need a quote
i = m.start()
try:
ustr = ustr + argstr[j:i]
except UnicodeError:
err = ""
for c in argstr[j:i]:
err = err + (" %02x" % ord(c))
streason = sys.exc_info()[1].__str__()
raise BadSyntax(
self._thisDoc, startline, argstr, j,
"Unicode error appending characters" +
" %s to string, because\n\t%s"
% (err, streason))
# print "@@@ i = ",i, " j=",j, "m.end=", m.end()
ch = argstr[i]
if ch == delim1:
j = i
continue
elif ch in ('"', "'") and ch != delim1:
ustr = ustr + ch
j = i + 1
continue
elif ch in "\r\n":
if delim == delim1:
raise BadSyntax(
self._thisDoc, startline, argstr, i,
"newline found in string literal")
self.lines = self.lines + 1
ustr = ustr + ch
j = i + 1
self.startOfLine = j
elif ch == "\\":
j = i + 1
ch = argstr[j:j + 1] # Will be empty if string ends
if not ch:
raise BadSyntax(
self._thisDoc, startline, argstr, i,
"unterminated string literal (2)")
k = 'abfrtvn\\"'.find(ch)
if k >= 0:
uch = '\a\b\f\r\t\v\n\\"'[k]
ustr = ustr + uch
j = j + 1
elif ch == "u":
j, ch = self.uEscape(argstr, j + 1, startline)
ustr = ustr + ch
elif ch == "U":
j, ch = self.UEscape(argstr, j + 1, startline)
ustr = ustr + ch
else:
self.BadSyntax(argstr, i,
"bad escape")
self.BadSyntax(argstr, i,
"unterminated string literal")
def _unicodeEscape(self, argstr, i, startline, reg, n, prefix):
if len(argstr)<i+n:
raise BadSyntax(
self._thisDoc, startline, argstr, i,
"unterminated string literal(3)")
try:
return i+n, reg.sub(unicodeExpand, '\\'+prefix+argstr[i:i+n])
except:
raise BadSyntax(
self._thisDoc, startline, argstr, i,
"bad string literal hex escape: "+argstr[i:i+n])
def uEscape(self, argstr, i, startline):
return self._unicodeEscape(argstr, i, startline, unicodeEscape4, 4, 'u')
def UEscape(self, argstr, i, startline):
return self._unicodeEscape(argstr, i, startline, unicodeEscape8, 8, 'U')
def BadSyntax(self, argstr, i, msg):
raise BadSyntax(self._thisDoc, self.lines, argstr, i, msg)
# If we are going to do operators then they should generate
# [ is operator:plus of ( \1 \2 ) ]
[docs]class BadSyntax(SyntaxError):
[docs] def __init__(self, uri, lines, argstr, i, why):
self._str = argstr.encode(
'utf-8') # Better go back to strings for errors
self._i = i
self._why = why
self.lines = lines
self._uri = uri
[docs] def __str__(self):
argstr = self._str
i = self._i
st = 0
if i > 60:
pre = "..."
st = i - 60
else:
pre = ""
if len(argstr) - i > 60:
post = "..."
else:
post = ""
return 'at line %i of <%s>:\nBad syntax (%s) at ^ in:\n"%s%s^%s%s"' \
% (self.lines + 1, self._uri, self._why, pre,
argstr[st:i], argstr[i:i + 60], post)
@property
def message(self):
return str(self)
###############################################################################
class Formula(object):
number = 0
def __init__(self, parent):
self.uuid = uuid4().hex
self.counter = 0
Formula.number += 1
self.number = Formula.number
self.existentials = {}
self.universals = {}
self.quotedgraph = QuotedGraph(
store=parent.store, identifier=self.id())
def __str__(self):
return '_:Formula%s' % self.number
def id(self):
return BNode('_:Formula%s' % self.number)
def newBlankNode(self, uri=None, why=None):
if uri is None:
self.counter += 1
bn = BNode('f%sb%s' % (self.uuid, self.counter))
else:
bn = BNode(uri.split('#').pop().replace('_', 'b'))
return bn
def newUniversal(self, uri, why=None):
return Variable(uri.split('#').pop())
def declareExistential(self, x):
self.existentials[x] = self.newBlankNode()
def close(self):
return self.quotedgraph
r_hibyte = re.compile(r'([\x80-\xff])')
class RDFSink(object):
def __init__(self, graph):
self.rootFormula = None
self.counter = 0
self.graph = graph
def newFormula(self):
assert self.graph.store.formula_aware
f = Formula(self.graph)
return f
def newGraph(self, identifier):
return Graph(self.graph.store, identifier)
def newSymbol(self, *args):
return URIRef(args[0])
def newBlankNode(self, arg=None, uri=None, why=None):
if isinstance(arg, Formula):
return arg.newBlankNode(uri)
elif isinstance(arg, Graph) or arg is None:
self.counter += 1
bn = BNode('n' + str(self.counter))
else:
bn = BNode(str(arg[0]).split('#').pop().replace('_', 'b'))
return bn
def newLiteral(self, s, dt, lang):
if dt:
return Literal(s, datatype=dt)
else:
return Literal(s, lang=lang)
def newList(self, n, f):
if not n:
return self.newSymbol(
'http://www.w3.org/1999/02/22-rdf-syntax-ns#nil'
)
a = self.newBlankNode(f)
first = self.newSymbol(
'http://www.w3.org/1999/02/22-rdf-syntax-ns#first'
)
rest = self.newSymbol(
'http://www.w3.org/1999/02/22-rdf-syntax-ns#rest')
self.makeStatement((f, first, a, n[0]))
self.makeStatement((f, rest, a, self.newList(n[1:], f)))
return a
def newSet(self, *args):
return set(args)
def setDefaultNamespace(self, *args):
return ':'.join(repr(n) for n in args)
def makeStatement(self, quadruple, why=None):
f, p, s, o = quadruple
if hasattr(p, 'formula'):
raise Exception("Formula used as predicate")
s = self.normalise(f, s)
p = self.normalise(f, p)
o = self.normalise(f, o)
if f == self.rootFormula:
# print s, p, o, '.'
self.graph.add((s, p, o))
elif isinstance(f, Formula):
f.quotedgraph.add((s, p, o))
else:
f.add((s,p,o))
# return str(quadruple)
def normalise(self, f, n):
if isinstance(n, tuple):
return URIRef(unicode(n[1]))
if isinstance(n, bool):
s = Literal(str(n).lower(), datatype=BOOLEAN_DATATYPE)
return s
if isinstance(n, int) or isinstance(n, long):
s = Literal(unicode(n), datatype=INTEGER_DATATYPE)
return s
if isinstance(n, Decimal):
value = str(n)
if value == '-0':
value = '0'
s = Literal(value, datatype=DECIMAL_DATATYPE)
return s
if isinstance(n, float):
s = Literal(str(n), datatype=DOUBLE_DATATYPE)
return s
if isinstance(f, Formula):
if n in f.existentials:
return f.existentials[n]
# if isinstance(n, Var):
# if f.universals.has_key(n):
# return f.universals[n]
# f.universals[n] = f.newBlankNode()
# return f.universals[n]
return n
def intern(self, something):
return something
def bind(self, pfx, uri):
pass # print pfx, ':', uri
def startDoc(self, formula):
self.rootFormula = formula
def endDoc(self, formula):
pass
###################################################
#
# Utilities
#
@py3compat.format_doctest_out
[docs]def hexify(ustr):
"""Use URL encoding to return an ASCII string
corresponding to the given UTF8 string
>>> hexify("http://example/a b")
%(b)s'http://example/a%%20b'
"""
# s1=ustr.encode('utf-8')
s = ""
for ch in ustr: # .encode('utf-8'):
if ord(ch) > 126 or ord(ch) < 33:
ch = "%%%02X" % ord(ch)
else:
ch = "%c" % ord(ch)
s = s + ch
return b(s)
[docs]class TurtleParser(Parser):
"""
An RDFLib parser for Turtle
See http://www.w3.org/TR/turtle/
"""
[docs] def __init__(self):
pass
[docs] def parse(self, source, graph, encoding="utf-8", turtle=True):
if encoding not in [None, "utf-8"]:
raise Exception(
("N3/Turtle files are always utf-8 encoded, ",
"I was passed: %s") % encoding)
sink = RDFSink(graph)
baseURI = graph.absolutize(
source.getPublicId() or source.getSystemId() or "")
p = SinkParser(sink, baseURI=baseURI, turtle=turtle)
p.loadStream(source.getByteStream())
for prefix, namespace in p._bindings.items():
graph.bind(prefix, namespace)
[docs]class N3Parser(TurtleParser):
"""
An RDFLib parser for Notation3
See http://www.w3.org/DesignIssues/Notation3.html
"""
[docs] def __init__(self):
pass
[docs] def parse(self, source, graph, encoding="utf-8"):
# we're currently being handed a Graph, not a ConjunctiveGraph
assert graph.store.context_aware # is this implied by formula_aware
assert graph.store.formula_aware
conj_graph = ConjunctiveGraph(store=graph.store)
conj_graph.default_context = graph # TODO: CG __init__ should have a
# default_context arg
# TODO: update N3Processor so that it can use conj_graph as the sink
conj_graph.namespace_manager = graph.namespace_manager
TurtleParser.parse(self, source, conj_graph, encoding, turtle=False)
def _test(): # pragma: no cover
import doctest
doctest.testmod()
# if __name__ == '__main__':
# _test()
def main(): # pragma: no cover
g = ConjunctiveGraph()
sink = RDFSink(g)
base_uri = 'file://' + os.path.join(os.getcwd(), sys.argv[1])
p = SinkParser(sink, baseURI=base_uri)
p._bindings[''] = p._baseURI + '#'
p.startDoc()
f = open(sys.argv[1], 'rb')
rdbytes = f.read()
f.close()
p.feed(rdbytes)
p.endDoc()
for t in g.quads((None, None, None)):
print t
if __name__ == '__main__':
main()
# ends