Source code for rdflib.store

"""
============
rdflib.store
============

Types of store
--------------

``Context-aware``: An RDF store capable of storing statements within contexts
is considered context-aware. Essentially, such a store is able to partition
the RDF model it represents into individual, named, and addressable
sub-graphs.

Relevant Notation3 reference regarding formulae, quoted statements, and such:
http://www.w3.org/DesignIssues/Notation3.html

``Formula-aware``: An RDF store capable of distinguishing between statements
that are asserted and statements that are quoted is considered formula-aware.

``Transaction-capable``: capable of providing transactional integrity to the
RDF operations performed on it.

``Graph-aware``: capable of keeping track of empty graphs.

------
"""

from __future__ import annotations

import pickle
from io import BytesIO
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    Generator,
    Iterable,
    Iterator,
    List,
    Mapping,
    Optional,
    Tuple,
    Union,
)

from rdflib.events import Dispatcher, Event

if TYPE_CHECKING:
    from rdflib.graph import (
        Graph,
        _ContextType,
        _ObjectType,
        _PredicateType,
        _QuadType,
        _SubjectType,
        _TriplePatternType,
        _TripleType,
    )
    from rdflib.plugins.sparql.sparql import Query, Update
    from rdflib.query import Result
    from rdflib.term import Identifier, Node, URIRef


# Constants representing the state of a Store (returned by the open method)
VALID_STORE = 1
CORRUPTED_STORE = 0
NO_STORE = -1
UNKNOWN: None = None


Pickler = pickle.Pickler
Unpickler = pickle.Unpickler
UnpicklingError = pickle.UnpicklingError

__all__ = [
    "StoreCreatedEvent",
    "TripleAddedEvent",
    "TripleRemovedEvent",
    "NodePickler",
    "Store",
]


[docs]class StoreCreatedEvent(Event): """ This event is fired when the Store is created, it has the following attribute: - ``configuration``: string used to create the store """
[docs]class TripleAddedEvent(Event): """ This event is fired when a triple is added, it has the following attributes: - the ``triple`` added to the graph - the ``context`` of the triple, if any - the ``graph`` to which the triple was added """
[docs]class TripleRemovedEvent(Event): """ This event is fired when a triple is removed, it has the following attributes: - the ``triple`` removed from the graph - the ``context`` of the triple, if any - the ``graph`` from which the triple was removed """
[docs]class NodePickler:
[docs] def __init__(self) -> None: self._objects: Dict[str, Any] = {} self._ids: Dict[Any, str] = {} self._get_object = self._objects.__getitem__
def _get_ids(self, key: Any) -> Optional[str]: try: return self._ids.get(key) except TypeError: return None
[docs] def register(self, object: Any, id: str) -> None: self._objects[id] = object self._ids[object] = id
[docs] def loads(self, s: bytes) -> Node: up = Unpickler(BytesIO(s)) # NOTE on type error: https://github.com/python/mypy/issues/2427 # type error: Cannot assign to a method up.persistent_load = self._get_object # type: ignore[assignment] try: return up.load() except KeyError as e: raise UnpicklingError("Could not find Node class for %s" % e)
[docs] def dumps( self, obj: Node, protocol: Optional[Any] = None, bin: Optional[Any] = None ): src = BytesIO() p = Pickler(src) # NOTE on type error: https://github.com/python/mypy/issues/2427 # type error: Cannot assign to a method p.persistent_id = self._get_ids # type: ignore[assignment] p.dump(obj) return src.getvalue()
[docs] def __getstate__(self) -> Mapping[str, Any]: state = self.__dict__.copy() del state["_get_object"] state.update( {"_ids": tuple(self._ids.items()), "_objects": tuple(self._objects.items())} ) return state
[docs] def __setstate__(self, state: Mapping[str, Any]) -> None: self.__dict__.update(state) self._ids = dict(self._ids) self._objects = dict(self._objects) self._get_object = self._objects.__getitem__
[docs]class Store: # Properties context_aware: bool = False formula_aware: bool = False transaction_aware: bool = False graph_aware: bool = False
[docs] def __init__( self, configuration: Optional[str] = None, identifier: Optional[Identifier] = None, ): """ identifier: URIRef of the Store. Defaults to CWD configuration: string containing information open can use to connect to datastore. """ self.__node_pickler: Optional[NodePickler] = None self.dispatcher = Dispatcher() if configuration: self.open(configuration)
@property def node_pickler(self) -> NodePickler: if self.__node_pickler is None: from rdflib.graph import Graph, QuotedGraph from rdflib.term import BNode, Literal, URIRef, Variable self.__node_pickler = np = NodePickler() np.register(self, "S") np.register(URIRef, "U") np.register(BNode, "B") np.register(Literal, "L") np.register(Graph, "G") np.register(QuotedGraph, "Q") np.register(Variable, "V") return self.__node_pickler # Database management methods # NOTE: Can't find any stores using this, we should consider deprecating it.
[docs] def create(self, configuration: str) -> None: self.dispatcher.dispatch(StoreCreatedEvent(configuration=configuration))
[docs] def open(self, configuration: str, create: bool = False) -> Optional[int]: """ Opens the store specified by the configuration string. If create is True a store will be created if it does not already exist. If create is False and a store does not already exist an exception is raised. An exception is also raised if a store exists, but there is insufficient permissions to open the store. This should return one of: VALID_STORE, CORRUPTED_STORE, or NO_STORE """ return UNKNOWN
[docs] def close(self, commit_pending_transaction: bool = False) -> None: """ This closes the database connection. The commit_pending_transaction parameter specifies whether to commit all pending transactions before closing (if the store is transactional). """
[docs] def destroy(self, configuration: str) -> None: """ This destroys the instance of the store identified by the configuration string. """
[docs] def gc(self) -> None: """ Allows the store to perform any needed garbage collection """ pass
# RDF APIs
[docs] def add( self, triple: _TripleType, context: _ContextType, quoted: bool = False, ) -> None: """ Adds the given statement to a specific context or to the model. The quoted argument is interpreted by formula-aware stores to indicate this statement is quoted/hypothetical It should be an error to not specify a context and have the quoted argument be True. It should also be an error for the quoted argument to be True when the store is not formula-aware. """ self.dispatcher.dispatch(TripleAddedEvent(triple=triple, context=context))
[docs] def addN(self, quads: Iterable[_QuadType]) -> None: # noqa: N802 """ Adds each item in the list of statements to a specific context. The quoted argument is interpreted by formula-aware stores to indicate this statement is quoted/hypothetical. Note that the default implementation is a redirect to add """ for s, p, o, c in quads: assert c is not None, "Context associated with %s %s %s is None!" % ( s, p, o, ) self.add((s, p, o), c)
[docs] def remove( self, triple: _TriplePatternType, context: Optional[_ContextType] = None, ) -> None: """Remove the set of triples matching the pattern from the store""" self.dispatcher.dispatch(TripleRemovedEvent(triple=triple, context=context))
[docs] def triples_choices( self, triple: Union[ Tuple[List[_SubjectType], _PredicateType, _ObjectType], Tuple[_SubjectType, List[_PredicateType], _ObjectType], Tuple[_SubjectType, _PredicateType, List[_ObjectType]], ], context: Optional[_ContextType] = None, ) -> Generator[ Tuple[ _TripleType, Iterator[Optional[_ContextType]], ], None, None, ]: """ A variant of triples that can take a list of terms instead of a single term in any slot. Stores can implement this to optimize the response time from the default 'fallback' implementation, which will iterate over each term in the list and dispatch to triples """ subject, predicate, object_ = triple if isinstance(object_, list): assert not isinstance(subject, list), "object_ / subject are both lists" assert not isinstance(predicate, list), "object_ / predicate are both lists" if object_: for obj in object_: for (s1, p1, o1), cg in self.triples( (subject, predicate, obj), context ): yield (s1, p1, o1), cg else: for (s1, p1, o1), cg in self.triples( (subject, predicate, None), context ): yield (s1, p1, o1), cg elif isinstance(subject, list): assert not isinstance(predicate, list), "subject / predicate are both lists" if subject: for subj in subject: for (s1, p1, o1), cg in self.triples( (subj, predicate, object_), context ): yield (s1, p1, o1), cg else: for (s1, p1, o1), cg in self.triples( (None, predicate, object_), context ): yield (s1, p1, o1), cg elif isinstance(predicate, list): assert not isinstance(subject, list), "predicate / subject are both lists" if predicate: for pred in predicate: for (s1, p1, o1), cg in self.triples( (subject, pred, object_), context ): yield (s1, p1, o1), cg else: for (s1, p1, o1), cg in self.triples((subject, None, object_), context): yield (s1, p1, o1), cg
# type error: Missing return statement
[docs] def triples( # type: ignore[return] self, triple_pattern: _TriplePatternType, context: Optional[_ContextType] = None, ) -> Iterator[Tuple[_TripleType, Iterator[Optional[_ContextType]]]]: """ A generator over all the triples matching the pattern. Pattern can include any objects for used for comparing against nodes in the store, for example, REGEXTerm, URIRef, Literal, BNode, Variable, Graph, QuotedGraph, Date? DateRange? :param context: A conjunctive query can be indicated by either providing a value of None, or a specific context can be queries by passing a Graph instance (if store is context aware). """ subject, predicate, object = triple_pattern
# variants of triples will be done if / when optimization is needed # type error: Missing return statement
[docs] def __len__(self, context: Optional[_ContextType] = None) -> int: # type: ignore[empty-body] """ Number of statements in the store. This should only account for non- quoted (asserted) statements if the context is not specified, otherwise it should return the number of statements in the formula or context given. :param context: a graph instance to query or None """
# type error: Missing return statement
[docs] def contexts( # type: ignore[empty-body] self, triple: Optional[_TripleType] = None ) -> Generator[_ContextType, None, None]: """ Generator over all contexts in the graph. If triple is specified, a generator over all contexts the triple is in. if store is graph_aware, may also return empty contexts :returns: a generator over Nodes """
# TODO FIXME: the result of query is inconsistent.
[docs] def query( self, query: Union[Query, str], initNs: Mapping[str, Any], # noqa: N803 initBindings: Mapping[str, Identifier], # noqa: N803 queryGraph: str, # noqa: N803 **kwargs: Any, ) -> Result: """ If stores provide their own SPARQL implementation, override this. queryGraph is None, a URIRef or '__UNION__' If None the graph is specified in the query-string/object If URIRef it specifies the graph to query, If '__UNION__' the union of all named graphs should be queried (This is used by ConjunctiveGraphs Values other than None obviously only makes sense for context-aware stores.) """ raise NotImplementedError
[docs] def update( self, update: Union[Update, str], initNs: Mapping[str, Any], # noqa: N803 initBindings: Mapping[str, Identifier], # noqa: N803 queryGraph: str, # noqa: N803 **kwargs: Any, ) -> None: """ If stores provide their own (SPARQL) Update implementation, override this. queryGraph is None, a URIRef or '__UNION__' If None the graph is specified in the query-string/object If URIRef it specifies the graph to query, If '__UNION__' the union of all named graphs should be queried (This is used by ConjunctiveGraphs Values other than None obviously only makes sense for context-aware stores.) """ raise NotImplementedError
# Optional Namespace methods
[docs] def bind(self, prefix: str, namespace: URIRef, override: bool = True) -> None: """ :param override: rebind, even if the given namespace is already bound to another prefix. """
[docs] def prefix(self, namespace: URIRef) -> Optional[str]: """"""
[docs] def namespace(self, prefix: str) -> Optional[URIRef]: """ """
[docs] def namespaces(self) -> Iterator[Tuple[str, URIRef]]: """ """ # This is here so that the function becomes an empty generator. # See https://stackoverflow.com/q/13243766 and # https://www.python.org/dev/peps/pep-0255/#why-a-new-keyword-for-yield-why-not-a-builtin-function-instead if False: yield None # type: ignore[unreachable]
# Optional Transactional methods
[docs] def commit(self) -> None: """ """
[docs] def rollback(self) -> None: """ """
# Optional graph methods
[docs] def add_graph(self, graph: Graph) -> None: """ Add a graph to the store, no effect if the graph already exists. :param graph: a Graph instance """ raise Exception("Graph method called on non-graph_aware store")
[docs] def remove_graph(self, graph: Graph) -> None: """ Remove a graph from the store, this should also remove all triples in the graph :param graphid: a Graph instance """ raise Exception("Graph method called on non-graph_aware store")