"""
This serialiser will output an RDF Graph as a JSON-LD formatted document. See:
http://json-ld.org/
Example usage::
>>> from rdflib import Graph
>>> testrdf = '''
... @prefix dc: <http://purl.org/dc/terms/> .
... <http://example.org/about>
... dc:title "Someone's Homepage"@en .
... '''
>>> g = Graph().parse(data=testrdf, format='n3')
>>> print(g.serialize(format='json-ld', indent=2))
[
{
"@id": "http://example.org/about",
"http://purl.org/dc/terms/title": [
{
"@language": "en",
"@value": "Someone's Homepage"
}
]
}
]
"""
# From: https://github.com/RDFLib/rdflib-jsonld/blob/feature/json-ld-1.1/rdflib_jsonld/serializer.py
# NOTE: This code writes the entire JSON object into memory before serialising,
# but we should consider streaming the output to deal with arbitrarily large
# graphs.
from __future__ import annotations
import warnings
from typing import IO, TYPE_CHECKING, Any, Union, cast
from rdflib.graph import DATASET_DEFAULT_GRAPH_ID, Graph
from rdflib.namespace import RDF, XSD
from rdflib.serializer import Serializer
from rdflib.term import BNode, IdentifiedNode, Literal, URIRef
from ..shared.jsonld.context import UNDEF, Context
from ..shared.jsonld.keys import CONTEXT, GRAPH, ID, LANG, LIST, SET, VOCAB
from ..shared.jsonld.util import _HAS_ORJSON, json, orjson
if TYPE_CHECKING:
from rdflib.graph import _ObjectType
# In JSON-LD, a Literal cannot be Subject. So define a new type
from ..shared.jsonld.context import JSONLDSubjectType, Term
__all__ = ["JsonLDSerializer", "from_rdf"]
PLAIN_LITERAL_TYPES = {XSD.boolean, XSD.integer, XSD.double, XSD.string}
[docs]
class JsonLDSerializer(Serializer):
[docs]
def __init__(self, store: Graph):
super(JsonLDSerializer, self).__init__(store)
[docs]
def serialize(
self,
stream: IO[bytes],
base: str | None = None,
encoding: str | None = None,
**kwargs: Any,
) -> None:
# TODO: docstring w. args and return value
encoding = encoding or "utf-8"
if encoding not in ("utf-8", "utf-16"):
warnings.warn(
"JSON should be encoded as unicode. " f"Given encoding was: {encoding}"
)
context_data = kwargs.get("context")
use_native_types = (kwargs.get("use_native_types", False),)
use_rdf_type = kwargs.get("use_rdf_type", False)
auto_compact = kwargs.get("auto_compact", False)
indent = kwargs.get("indent", 2)
separators = kwargs.get("separators", (",", ": "))
sort_keys = kwargs.get("sort_keys", True)
ensure_ascii = kwargs.get("ensure_ascii", False)
obj = from_rdf(
self.store,
context_data,
base,
use_native_types,
use_rdf_type,
auto_compact=auto_compact,
)
if _HAS_ORJSON:
option: int = orjson.OPT_NON_STR_KEYS
if indent is not None:
option |= orjson.OPT_INDENT_2
if sort_keys:
option |= orjson.OPT_SORT_KEYS
if ensure_ascii:
warnings.warn("Cannot use ensure_ascii with orjson")
data_bytes = orjson.dumps(obj, option=option)
stream.write(data_bytes)
else:
data = json.dumps(
obj,
indent=indent,
separators=separators,
sort_keys=sort_keys,
ensure_ascii=ensure_ascii,
)
stream.write(data.encode(encoding, "replace"))
[docs]
def from_rdf(
graph,
context_data=None,
base=None,
use_native_types=False,
use_rdf_type=False,
auto_compact=False,
startnode=None,
index=False,
):
# TODO: docstring w. args and return value
# TODO: support for index and startnode
if not context_data and auto_compact:
context_data = dict(
(pfx, str(ns))
for (pfx, ns) in graph.namespaces()
if pfx and str(ns) != "http://www.w3.org/XML/1998/namespace"
)
if isinstance(context_data, Context):
context = context_data
context_data = context.to_dict()
else:
context = Context(context_data, base=base)
converter = Converter(context, use_native_types, use_rdf_type)
result = converter.convert(graph)
if converter.context.active:
if isinstance(result, list):
result = {context.get_key(GRAPH): result}
result[CONTEXT] = context_data
return result
class Converter:
def __init__(self, context: Context, use_native_types: bool, use_rdf_type: bool):
self.context = context
self.use_native_types = context.active or use_native_types
self.use_rdf_type = use_rdf_type
def convert(self, graph: Graph):
# TODO: bug in rdflib dataset parsing (nquads et al):
# plain triples end up in separate unnamed graphs (rdflib issue #436)
if graph.context_aware:
# type error: "Graph" has no attribute "contexts"
all_contexts = list(graph.contexts()) # type: ignore[attr-defined]
has_dataset_default_id = any(
c.identifier == DATASET_DEFAULT_GRAPH_ID for c in all_contexts
)
if (
has_dataset_default_id
# # type error: "Graph" has no attribute "contexts"
and graph.default_context.identifier == DATASET_DEFAULT_GRAPH_ID # type: ignore[attr-defined]
):
default_graph = graph.default_context # type: ignore[attr-defined]
else:
default_graph = Graph()
graphs = [default_graph]
default_graph_id = default_graph.identifier
for g in all_contexts:
if g in graphs:
continue
if isinstance(g.identifier, URIRef):
graphs.append(g)
else:
default_graph += g
else:
graphs = [graph]
default_graph_id = graph.identifier
context = self.context
objs: list[Any] = []
for g in graphs:
obj = {}
graphname = None
if isinstance(g.identifier, URIRef):
if g.identifier != default_graph_id:
graphname = context.shrink_iri(g.identifier)
obj[context.id_key] = graphname
nodes = self.from_graph(g)
if not graphname and len(nodes) == 1:
obj.update(nodes[0])
else:
if not nodes:
continue
obj[context.graph_key] = nodes
if objs and objs[0].get(context.get_key(ID)) == graphname:
objs[0].update(obj)
else:
objs.append(obj)
if len(graphs) == 1 and len(objs) == 1 and not self.context.active:
default = objs[0]
items = default.get(context.graph_key)
if len(default) == 1 and items:
objs = items
elif len(objs) == 1 and self.context.active:
objs = objs[0]
return objs
def from_graph(self, graph: Graph):
nodemap: dict[Any, Any] = {}
for s in set(graph.subjects()):
## only iri:s and unreferenced (rest will be promoted to top if needed)
if isinstance(s, URIRef) or (
isinstance(s, BNode) and not any(graph.subjects(None, s))
):
self.process_subject(graph, s, nodemap)
return list(nodemap.values())
def process_subject(self, graph: Graph, s: IdentifiedNode, nodemap):
if isinstance(s, URIRef):
node_id = self.context.shrink_iri(s)
elif isinstance(s, BNode):
node_id = s.n3()
else:
# This does not seem right, this probably should be an error.
node_id = None
# used_as_object = any(graph.subjects(None, s))
if node_id in nodemap:
return None
node = {}
node[self.context.id_key] = node_id
nodemap[node_id] = node
for p, o in graph.predicate_objects(s):
# predicate_objects can return a lot of different types,
# but we only need to consider it to be a JSON-compatible type.
object_val = cast(Union[IdentifiedNode, Literal], o)
pred_val = cast(IdentifiedNode, p)
self.add_to_node(graph, s, pred_val, object_val, node, nodemap)
return node
def add_to_node(
self,
graph: Graph,
s: IdentifiedNode,
p: IdentifiedNode,
o: IdentifiedNode | Literal,
s_node: dict[str, Any],
nodemap,
):
context = self.context
term: Term | None = None
if isinstance(o, Literal):
_datatype = str(o.datatype) if o.datatype else None
language = o.language
term = context.find_term(str(p), _datatype, language=language)
else:
containers = [LIST, None] if graph.value(o, RDF.first) else [None]
for container in containers:
for coercion in (ID, VOCAB, UNDEF):
term = context.find_term(str(p), coercion, container)
if term:
break
if term:
break
language = None if term is None else term.language
node: str | list[Any] | dict[str, Any] | None = None
use_set = not context.active
if term is not None:
p_key = term.name
if term.type:
node = self.type_coerce(o, term.type)
elif (
term.language and isinstance(o, Literal) and o.language == term.language
):
node = str(o)
elif context.language:
# TODO: MyPy thinks this is unreachable?
if isinstance(o, Literal) and ( # type: ignore[unreachable]
term.language is None and o.language is None
):
node = str(o)
if LIST in term.container:
assert isinstance(
o, IdentifiedNode
), "Subject of a @list container must an a URI or BNode"
_col = self.to_collection(graph, o)
if _col is not None:
node = []
for v in _col:
if isinstance(v, (IdentifiedNode, Literal)):
coerced = self.type_coerce(v, term.type)
else:
coerced = None
if coerced is not None:
node.append(coerced)
else:
node.append(self.to_raw_value(graph, s, v, nodemap))
elif LANG in term.container and language:
value = s_node.setdefault(p_key, {})
values = value.get(language)
node = str(o)
if values or SET in term.container:
if not isinstance(values, list):
value[language] = values = [values]
values.append(node)
else:
value[language] = node
return
elif SET in term.container:
use_set = True
else:
p_key = context.to_symbol(p)
# TODO: for coercing curies - quite clumsy; unify to_symbol and find_term?
key_term = context.terms.get(p_key)
if key_term and (key_term.type or key_term.container):
p_key = p
if not term and p == RDF.type and not self.use_rdf_type:
if isinstance(o, URIRef):
node = context.to_symbol(o)
p_key = context.type_key
if node is None:
node = self.to_raw_value(graph, s, o, nodemap)
value = s_node.get(p_key)
if value:
if not isinstance(value, list):
value = [value]
value.append(node)
elif use_set:
value = [node]
else:
value = node
s_node[p_key] = value
def type_coerce(
self, o: IdentifiedNode | Literal, coerce_type: str
) -> str | IdentifiedNode | Literal | None:
if coerce_type == ID:
if isinstance(o, URIRef):
return self.context.shrink_iri(o)
elif isinstance(o, BNode):
return o.n3()
else:
return o
elif coerce_type == VOCAB and isinstance(o, URIRef):
return self.context.to_symbol(o)
elif isinstance(o, Literal) and str(o.datatype) == coerce_type:
return o
else:
return None
def to_raw_value(
self,
graph: Graph,
s: JSONLDSubjectType,
o: _ObjectType,
nodemap: dict[str, Any],
):
context = self.context
if isinstance(o, (URIRef, BNode)):
coll: list[Any] | None = self.to_collection(graph, o)
else:
coll = None
if coll is not None:
coll = [self.to_raw_value(graph, s, lo, nodemap) for lo in coll]
return {context.list_key: coll}
elif isinstance(o, BNode):
embed = (
False # TODO: self.context.active or using startnode and only one ref
)
onode = self.process_subject(graph, o, nodemap)
if onode:
if embed and not any(s2 for s2 in graph.subjects(None, o) if s2 != s):
return onode
else:
nodemap[onode[context.id_key]] = onode
return {context.id_key: o.n3()}
elif isinstance(o, URIRef):
# TODO: embed if o != startnode (else reverse)
return {context.id_key: context.shrink_iri(o)}
elif isinstance(o, Literal):
# TODO: if compact
native = self.use_native_types and o.datatype in PLAIN_LITERAL_TYPES
if native:
v = o.toPython()
else:
v = str(o)
if o.datatype:
if native and self.context.active:
return v
return {
context.type_key: context.to_symbol(o.datatype),
context.value_key: v,
}
elif o.language and o.language != context.language:
return {context.lang_key: o.language, context.value_key: v}
# type error: Right operand of "and" is never evaluated
elif not context.active or context.language and not o.language: # type: ignore[unreachable]
return {context.value_key: v}
else:
return v
def to_collection(
self, graph: Graph, l_: JSONLDSubjectType
) -> list[_ObjectType] | None:
if l_ != RDF.nil and not graph.value(l_, RDF.first):
return None
list_nodes: list[_ObjectType] = []
chain: set[_ObjectType] = set([l_])
list_head: _ObjectType | None = l_
while list_head:
if list_head == RDF.nil:
# The only way to return a real result is to reach
# a rdf:nil node at the end of a rdf list.
return list_nodes
if isinstance(list_head, URIRef):
return None
first, rest = None, None
for p, o in graph.predicate_objects(list_head):
if not first and p == RDF.first:
first = o
elif not rest and p == RDF.rest:
rest = o
elif p != RDF.type or o != RDF.List:
return None
if first is not None:
list_nodes.append(first)
if rest is None:
# TODO: If no rdf:rest is found, should we return the current list_nodes?
return None
list_head = rest
if list_head in chain:
return None # TODO: Should this just return the current list_nodes?
chain.add(list_head)
return None