Source code for rdflib.plugins.stores.concurrent
from threading import Lock
[docs]class ResponsibleGenerator(object):
"""A generator that will help clean up when it is done being used."""
__slots__ = ['cleanup', 'gen']
[docs] def __init__(self, gen, cleanup):
self.cleanup = cleanup
self.gen = gen
[docs] def __del__(self):
self.cleanup()
[docs] def __iter__(self):
return self
[docs] def next(self):
return self.gen.next()
[docs]class ConcurrentStore(object):
[docs] def __init__(self, store):
self.store = store
# number of calls to visit still in progress
self.__visit_count = 0
# lock for locking down the indices
self.__lock = Lock()
# lists for keeping track of added and removed triples while
# we wait for the lock
self.__pending_removes = []
self.__pending_adds = []
[docs] def add(self, triple):
(s, p, o) = triple
if self.__visit_count == 0:
self.store.add((s, p, o))
else:
self.__pending_adds.append((s, p, o))
[docs] def remove(self, triple):
(s, p, o) = triple
if self.__visit_count == 0:
self.store.remove((s, p, o))
else:
self.__pending_removes.append((s, p, o))
[docs] def triples(self, triple):
(su, pr, ob) = triple
g = self.store.triples((su, pr, ob))
pending_removes = self.__pending_removes
self.__begin_read()
for s, p, o in ResponsibleGenerator(g, self.__end_read):
if not (s, p, o) in pending_removes:
yield s, p, o
for (s, p, o) in self.__pending_adds:
if (su is None or su == s) \
and (pr is None or pr == p) \
and (ob is None or ob == o):
yield s, p, o
[docs] def __len__(self):
return self.store.__len__()
def __begin_read(self):
lock = self.__lock
lock.acquire()
self.__visit_count = self.__visit_count + 1
lock.release()
def __end_read(self):
lock = self.__lock
lock.acquire()
self.__visit_count = self.__visit_count - 1
if self.__visit_count == 0:
pending_removes = self.__pending_removes
while pending_removes:
(s, p, o) = pending_removes.pop()
try:
self.store.remove((s, p, o))
except:
# TODO: change to try finally?
print s, p, o, "Not in store to remove"
pending_adds = self.__pending_adds
while pending_adds:
(s, p, o) = pending_adds.pop()
self.store.add((s, p, o))
lock.release()