Source code for rdflib.plugins.stores.concurrent

from threading import Lock


[docs]class ResponsibleGenerator: """A generator that will help clean up when it is done being used.""" __slots__ = ["cleanup", "gen"]
[docs] def __init__(self, gen, cleanup): self.cleanup = cleanup self.gen = gen
[docs] def __del__(self): self.cleanup()
[docs] def __iter__(self): return self
[docs] def __next__(self): return next(self.gen)
[docs]class ConcurrentStore:
[docs] def __init__(self, store): self.store = store # number of calls to visit still in progress self.__visit_count = 0 # lock for locking down the indices self.__lock = Lock() # lists for keeping track of added and removed triples while # we wait for the lock self.__pending_removes = [] self.__pending_adds = []
[docs] def add(self, triple): (s, p, o) = triple if self.__visit_count == 0: self.store.add((s, p, o)) else: self.__pending_adds.append((s, p, o))
[docs] def remove(self, triple): (s, p, o) = triple if self.__visit_count == 0: self.store.remove((s, p, o)) else: self.__pending_removes.append((s, p, o))
[docs] def triples(self, triple): (su, pr, ob) = triple g = self.store.triples((su, pr, ob)) pending_removes = self.__pending_removes self.__begin_read() for s, p, o in ResponsibleGenerator(g, self.__end_read): if not (s, p, o) in pending_removes: # noqa: E713 yield s, p, o for s, p, o in self.__pending_adds: if ( (su is None or su == s) and (pr is None or pr == p) and (ob is None or ob == o) ): yield s, p, o
[docs] def __len__(self): return self.store.__len__()
def __begin_read(self): lock = self.__lock lock.acquire() self.__visit_count = self.__visit_count + 1 lock.release() def __end_read(self): lock = self.__lock lock.acquire() self.__visit_count = self.__visit_count - 1 if self.__visit_count == 0: pending_removes = self.__pending_removes while pending_removes: (s, p, o) = pending_removes.pop() try: self.store.remove((s, p, o)) except: # noqa: E722 # TODO: change to try finally? print(s, p, o, "Not in store to remove") pending_adds = self.__pending_adds while pending_adds: (s, p, o) = pending_adds.pop() self.store.add((s, p, o)) lock.release()