Module tf.search.graph
Graph oriented functions needed for search
Functions
def connectedness(searchExe)
-
Expand source code Browse git
def connectedness(searchExe): error = searchExe.api.TF.error qnodes = searchExe.qnodes qedges = searchExe.qedges _msgCache = searchExe._msgCache componentIndex = dict(((q, {q}) for q in range(len(qnodes)))) for (f, rela, t) in qedges: if f != t: componentIndex[f] |= componentIndex[t] componentIndex[t] = componentIndex[f] for u in componentIndex[f] - {f}: componentIndex[u] = componentIndex[f] components = sorted({frozenset(c) for c in componentIndex.values()}) componentIndex = {} for c in components: for q in c: componentIndex[q] = c componentEdges = {} for (e, (f, rela, t)) in enumerate(qedges): c = componentIndex[f] componentEdges.setdefault(c, []).append(e) searchExe.components = [] for c in components: searchExe.components.append([sorted(c), componentEdges.get(c, [])]) lComps = len(searchExe.components) if lComps == 0: error("Search without instructions. Tell me what to look for.", cache=_msgCache) searchExe.good = False elif lComps > 1: error( f"More than one connected components ({len(searchExe.components)}):", cache=_msgCache, ) error( "Either run the subqueries one by one, or connect the components by a relation", tm=False, cache=_msgCache, ) searchExe.good = False
def displayEdge(searchExe, e, dir, nodesSeen)
-
Expand source code Browse git
def displayEdge(searchExe, e, dir, nodesSeen): info = searchExe.api.TF.info _msgCache = searchExe._msgCache qnodes = searchExe.qnodes qedges = searchExe.qedges converse = searchExe.converse relations = searchExe.relations spreads = searchExe.spreads spreadsC = searchExe.spreadsC thinned = getattr(searchExe, "thinned", set()) (f, rela, t) = qedges[e] if type(f) is not tuple: f = (f,) if type(t) is not tuple: t = (t,) if type(rela) is not tuple: rela = (rela,) if dir == -1: (f, rela, t) = (t, tuple(converse[r] for r in rela), f) nodesInvolved = set(chain(f, t)) seen = all(q in nodesSeen for q in nodesInvolved) thinRep = "" if seen else " (thinned)" if e in thinned else "" spread = ( f"{0:>6} " if seen else f"{spreads.get(e, -1) if dir == 1 else spreadsC.get(e, -1):8.1f}" ) info( "edge {:>8}-{:<13} {:^8} {:>2}-{:<13} {} choices{}".format( ",".join(str(x) for x in f), ",".join({qnodes[x][0] for x in f}), ",".join(relations[x]["acro"] for x in rela), ",".join(str(x) for x in set(t)), ",".join(qnodes[x][0] for x in set(t)), spread, thinRep, ), tm=False, cache=_msgCache, ) return nodesInvolved
def displayNode(searchExe, q, pos2=False)
-
Expand source code Browse git
def displayNode(searchExe, q, pos2=False): info = searchExe.api.TF.info _msgCache = searchExe._msgCache qnodes = searchExe.qnodes yarns = searchExe.yarns space = " " * 31 nodeInfo = ( "node {} {:>2}-{:<13} {:>6} choices".format( space, q, qnodes[q][0], len(yarns[q]), ) if pos2 else "node {:>2}-{:<13} {} {:>6} choices".format( q, qnodes[q][0], space, len(yarns[q]), ) ) info(nodeInfo, tm=False, cache=_msgCache)
def displayPlan(searchExe, details=False)
-
Expand source code Browse git
def displayPlan(searchExe, details=False): if not searchExe.good: return api = searchExe.api TF = api.TF setSilent = TF.setSilent isSilent = TF.isSilent info = TF.info wasSilent = isSilent() setSilent(False) _msgCache = searchExe._msgCache nodeLine = searchExe.nodeLine qedges = searchExe.qedges (qs, es) = searchExe.stitchPlan offset = searchExe.offset if details: info( f"Search with {len(qs)} objects and {len(es)} relations", tm=False, cache=_msgCache, ) info( "Results are instantiations of the following objects:", tm=False, cache=_msgCache, ) for q in qs: displayNode(searchExe, q) if len(es) != 0: info("Performance parameters:", tm=False, cache=_msgCache) for (k, v) in searchExe.perfParams.items(): info(f"\t{k:<20} = {v:>7}", tm=False, cache=_msgCache) info( "Instantiations are computed along the following relations:", tm=False, cache=_msgCache, ) (firstE, firstDir) = es[0] (f, rela, t) = qedges[firstE] if firstDir == -1: (f, t) = (t, f) displayNode(searchExe, f, pos2=True) nodesSeen = {f} for e in es: nodesSeen |= displayEdge(searchExe, *e, nodesSeen) info( "The results are connected to the original search template as follows:", cache=_msgCache, ) resultNode = {} for q in qs: resultNode[nodeLine[q]] = q for (i, line) in enumerate(searchExe.searchLines): rNode = resultNode.get(i, "") prefix = "" if rNode == "" else "R" info(f"{i + offset:>2} {prefix:<1}{rNode:<2} {line}", tm=False, cache=_msgCache) setSilent(wasSilent)
def multiEdges(searchExe)
-
Expand source code Browse git
def multiEdges(searchExe): relations = searchExe.relations qedges = searchExe.qedges _msgCache = searchExe._msgCache api = searchExe.api error = api.TF.error medgesIndex = {} # will be a dict keyed by edge destination, then by upper / lower bound # and then the values are directed edges for (e, (f, rela, t)) in enumerate(qedges): relInfo = relations[rela] acro = relInfo.get("name", relInfo["acro"]) if acro in HALFBOUNDED: dir = HALFBOUNDED[acro] # edge e,1 arrives at t which acts as an upper bound medgesIndex.setdefault(t, {}).setdefault(dir, set()).add((e, 1)) # edge e,-1 arrives at f which acts as a lower bound medgesIndex.setdefault(f, {}).setdefault(-dir, set()).add((e, -1)) medges = [] for eInfo in medgesIndex.values(): if 1 in eInfo and -1 in eInfo: es = eInfo[1] | eInfo[-1] ts = {qedges[e][2 if dir == 1 else 0] for (e, dir) in es} if len(ts) != 1: # if this happens, it is a fault in the business logic, not caused by the user eRep = " | ".join(str(qedges[e]) for (e, dir) in es) error( f"Multi-edge with {len(ts)} destinations: {eRep}", tm=False, cache=_msgCache, ) medges.append(es) # so medges is a collection sets of edges # each set consists of directed edges that have the same qnode as destination searchExe.medges = medges