Source code for rdflib.tools.graphisomorphism

"""
A commandline tool for testing if RDF graphs are isomorpic, i.e. equal
if BNode labels are ignored.
"""

from __future__ import absolute_import

from rdflib import Graph
from rdflib import BNode
from itertools import combinations


[docs]class IsomorphicTestableGraph(Graph): """ Ported from: http://www.w3.org/2001/sw/DataAccess/proto-tests/tools/rdfdiff.py (Sean B Palmer's RDF Graph Isomorphism Tester) """
[docs] def __init__(self, **kargs): super(IsomorphicTestableGraph, self).__init__(**kargs) self.hash = None
[docs] def internal_hash(self): """ This is defined instead of __hash__ to avoid a circular recursion scenario with the Memory store for rdflib which requires a hash lookup in order to return a generator of triples """ return hash(tuple(sorted(self.hashtriples())))
[docs] def hashtriples(self): for triple in self: g = ((isinstance(t, BNode) and self.vhash(t)) or t for t in triple) yield hash(tuple(g))
[docs] def vhash(self, term, done=False): return tuple(sorted(self.vhashtriples(term, done)))
[docs] def vhashtriples(self, term, done): for t in self: if term in t: yield tuple(self.vhashtriple(t, term, done))
[docs] def vhashtriple(self, triple, term, done): for p in range(3): if not isinstance(triple[p], BNode): yield triple[p] elif done or (triple[p] == term): yield p else: yield self.vhash(triple[p], done=True)
[docs] def __eq__(self, G): """Graph isomorphism testing.""" if not isinstance(G, IsomorphicTestableGraph): return False elif len(self) != len(G): return False elif list.__eq__(list(self), list(G)): return True # @@ return self.internal_hash() == G.internal_hash()
[docs] def __ne__(self, G): """Negative graph isomorphism testing.""" return not self.__eq__(G)
[docs]def main(): import sys from optparse import OptionParser usage = '''usage: %prog [options] file1 file2 ... fileN''' op = OptionParser(usage=usage) op.add_option('-s', '--stdin', action='store_true', default=False, help='Load from STDIN as well') op.add_option('--format', default='xml', dest='inputFormat', metavar='RDF_FORMAT', choices=['xml', 'trix', 'n3', 'nt', 'rdfa'], help="The format of the RDF document(s) to compare" + "One of 'xml','n3','trix', 'nt', " + "or 'rdfa'. The default is %default") (options, args) = op.parse_args() graphs = [] graph2FName = {} if options.stdin: graph = IsomorphicTestableGraph().parse( sys.stdin, format=options.inputFormat) graphs.append(graph) graph2FName[graph] = '(STDIN)' for fn in args: graph = IsomorphicTestableGraph().parse( fn, format=options.inputFormat) graphs.append(graph) graph2FName[graph] = fn checked = set() for graph1, graph2 in combinations(graphs, 2): if (graph1, graph2) not in checked and (graph2, graph1) not in checked: assert graph1 == graph2, "%s != %s" % ( graph2FName[graph1], graph2FName[graph2])
if __name__ == '__main__': main()