Python networkx 模块,descendants() 实例源码

我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用networkx.descendants()

项目:revex    作者:lucaswiman    | 项目源码 | 文件源码
def _acceptable_subgraph(self):  # type:  () -> nx.MultiDiGraph
        graph = self.as_multidigraph
        reachable_states = nx.descendants(graph, self.start) | {self.start}
        graph = graph.subgraph(reachable_states)
        reachable_accepting_states = reachable_states & {
            node for node in graph.node if graph.node[node]['accepting']
        }

        # Add a "sink" node with an in-edge from every accepting state. This is
        # is solely done because the networkx API makes it easier to find the
        # ancestor of a node than a set of nodes.
        sink = object()
        graph.add_node(sink)
        for state in reachable_accepting_states:
            graph.add_edge(state, sink)

        acceptable_sates = nx.ancestors(graph, sink)
        return graph.subgraph(acceptable_sates)
项目:revex    作者:lucaswiman    | 项目源码 | 文件源码
def live_subgraph(self):  # type: () -> nx.MultiDiGraph
        """
        Returns the graph of "live" states for this graph, i.e. the start state
        together with states that may be involved in positively matching a string
        (reachable from the start node and an ancestor of an accepting node).

        This is intended for display purposes, only showing the paths which
        might lead to an accepting state, or just the start state if no such
        paths exist.
        """
        graph = self.as_multidigraph
        accepting_states = {
            node for node in graph.node if graph.node[node]['accepting']
        }

        # Add a "sink" node with an in-edge from every accepting state. This is
        # is solely done because the networkx API makes it easier to find the
        # ancestor of a node than a set of nodes.
        sink = object()
        graph.add_node(sink)
        for state in accepting_states:
            graph.add_edge(state, sink)

        live_states = {self.start} | (nx.ancestors(graph, sink) & nx.descendants(graph, self.start))
        return graph.subgraph(live_states)
项目:seqenv    作者:xapple    | 项目源码 | 文件源码
def print_test(self, e=None):
        """Just a method to see a bit how the different libraries work."""
        # Test node #
        if e is None: e = test_envos[0]
        # Goa #
        print "Goa: "
        print self.goatools[e]
        # Pygraphviz #
        print "pygraphviz: "
        print self.pygraphviz[e]
        print self.pygraphviz.successors(e)
        print self.pygraphviz.predecessors(e)
        print self.pygraphviz.get_node(e)
        # Networkx #
        import networkx
        print "networkx: "
        print self.networkx[e]
        print self.networkx.successors(e)
        print self.networkx.predecessors(e)
        print networkx.ancestors(self.networkx, e)   # same as predecessors
        print networkx.descendants(self.networkx, e) # almost as child_to_parents
项目:symd    作者:xym-tool    | 项目源码 | 文件源码
def print_impacting_modules(single_node=None):
    """
    For each module, print a list of modules that the module is depending on,
    i.e. modules whose change can potentially impact the module. The function
    shows all levels of dependency, not just the immediately imported
    modules.
    :return:
    """
    print('\n===Impacting Modules===')
    for node_name in G.nodes_iter():
        if single_node and (node_name!=single_node):
            continue
        descendants = nx.descendants(G, node_name)
        print(augment_format_string(node_name, '\n%s:') % node_name)
        for d in descendants:
            print(augment_format_string(d, '    %s') % d)
项目:silverchain    作者:tomokinakamaru    | 项目源码 | 文件源码
def _create_eps_closure_func(table):
    edges = {(c.src, c.sym.text, c.dst[0]) for c in table.cells}
    graph = DiGraph((src, dst) for src, sym, dst in edges if sym == '')
    states = set(sum(((src, dst) for src, _, dst in edges), ()))

    cs = {s: descendants(graph, s) if s in graph else set() for s in states}
    return lambda ss: frozenset(set(ss).union(*(cs[s] for s in ss)))
项目:qiskit-sdk-py    作者:QISKit    | 项目源码 | 文件源码
def remove_descendants_of(self, node):
        """Remove all of the descendant operation nodes of node."""
        dec = nx.descendants(self.multi_graph, node)
        for n in dec:
            nd = self.multi_graph.node[n]
            if nd["type"] == "op":
                self._remove_op_node(n)
项目:qiskit-sdk-py    作者:QISKit    | 项目源码 | 文件源码
def remove_nondescendants_of(self, node):
        """Remove all of the non-descendants operation nodes of node."""
        dec = nx.descendants(self.multi_graph, node)
        comp = list(set(self.multi_graph.nodes()) - set(dec))
        for n in comp:
            nd = self.multi_graph.node[n]
            if nd["type"] == "op":
                self._remove_op_node(n)
项目:yadage    作者:diana-hep    | 项目源码 | 文件源码
def collective_downstream(workflow, steps):
    downstream = set()
    for step in steps:
        for x in nx.descendants(workflow.dag, step):
            downstream.add(x)
    return list(downstream)
项目:dbt    作者:fishtown-analytics    | 项目源码 | 文件源码
def get_nodes_from_spec(graph, spec):
    select_parents = spec['select_parents']
    select_children = spec['select_children']
    qualified_node_name = spec['qualified_node_name']

    selected_nodes = set(get_nodes_by_qualified_name(graph,
                                                     qualified_node_name))

    additional_nodes = set()
    test_nodes = set()

    if select_parents:
        for node in selected_nodes:
            parent_nodes = nx.ancestors(graph, node)
            additional_nodes.update(parent_nodes)

    if select_children:
        for node in selected_nodes:
            child_nodes = nx.descendants(graph, node)
            additional_nodes.update(child_nodes)

    model_nodes = selected_nodes | additional_nodes

    for node in model_nodes:
        # include tests that depend on this node. if we aren't running tests,
        # they'll be filtered out later.
        child_tests = [n for n in graph.successors(node)
                       if graph.node.get(n).get('resource_type') ==
                       NodeType.Test]
        test_nodes.update(child_tests)

    return model_nodes | test_nodes
项目:dbt    作者:fishtown-analytics    | 项目源码 | 文件源码
def get_dependent_nodes(self, node):
        return nx.descendants(self.graph, node)
项目:dbt    作者:fishtown-analytics    | 项目源码 | 文件源码
def remove_node(self, node):
        children = nx.descendants(self.graph, node)
        self.graph.remove_node(node)
        return children
项目:seqenv    作者:xapple    | 项目源码 | 文件源码
def get_subgraph(self, envos=None):
        """Given a list of ENVO terms, get the subgraph that contains them all
        and all their ancestors, up to the root.
        Outputs a networkx DiGraph object."""
        # Testing mode #
        if envos is None: envos = test_envos
        # All nodes #
        nodes = set(n for e in envos for n in networkx.descendants(self.networkx, e))
        nodes.update(envos)
        nodes = list(nodes)
        # Return #
        return self.networkx.subgraph(nodes)
项目:symd    作者:xym-tool    | 项目源码 | 文件源码
def print_impacting_modules(single_node=None, json_out=None):
    """
    For each module, print a list of modules that the module is depending on,
    i.e. modules whose change can potentially impact the module. The function
    shows all levels of dependency, not just the immediately imported
    modules.  If the json_out argument is not None, then the output will be
    recorded there instead of on stdout.
    :return:
    """
    if json_out is None:
        print('\n===Impacting Modules===')
    else:
        json_out['impacting_modules'] = {}
    for node_name in G.nodes_iter():
        if single_node and (node_name!=single_node):
            continue
        descendants = nx.descendants(G, node_name)
        if json_out is None:
            print(augment_format_string(node_name, '\n%s:') % node_name)
        else:
            json_out['impacting_modules'][node_name] = []
        for d in descendants:
            if json_out is None:
                print(augment_format_string(d, '    %s') % d)
            else:
                json_out['impacting_modules'][node_name].append(d)
项目:symd    作者:xym-tool    | 项目源码 | 文件源码
def prune_standalone_nodes():
    """
    Remove from the module dependency graph all modules that do not have any
    dependencies (i.e they neither import/include any modules nor are they
    imported/included by any modules)
    :return: the connected module dependency graph
    """
    ng = nx.DiGraph(G)
    for node_name in G.nodes_iter():
        ancestors = nx.ancestors(G, node_name)
        descendants = nx.descendants(G, node_name)
        if len(ancestors) == 0 and len(descendants) == 0:
            ng.remove_node(node_name)
    return ng
项目:symd    作者:xym-tool    | 项目源码 | 文件源码
def prune_standalone_nodes():
    """
    Remove from the module dependency graph all modules that do not have any
    dependencies (i.e they neither import/include any modules nor are they
    imported/included by any modules)
    :return: the connected module dependency graph
    """
    ng = nx.DiGraph(G)
    for node_name in G.nodes_iter():
        ancestors = nx.ancestors(G, node_name)
        descendants = nx.descendants(G, node_name)
        if len(ancestors) == 0 and len(descendants) == 0:
            ng.remove_node(node_name)
    return ng
项目:graphkit    作者:yahoo    | 项目源码 | 文件源码
def _find_necessary_steps(self, outputs, inputs):
        """
        Determines what graph steps need to pe run to get to the requested
        outputs from the provided inputs.  Eliminates steps that come before
        (in topological order) any inputs that have been provided.  Also
        eliminates steps that are not on a path from the provided inputs to
        the requested outputs.

        :param list outputs:
            A list of desired output names.  This can also be ``None``, in which
            case the necessary steps are all graph nodes that are reachable
            from one of the provided inputs.

        :param dict inputs:
            A dictionary mapping names to values for all provided inputs.

        :returns:
            Returns a list of all the steps that need to be run for the
            provided inputs and requested outputs.
        """

        if not outputs:

            # If caller requested all outputs, the necessary nodes are all
            # nodes that are reachable from one of the inputs.  Ignore input
            # names that aren't in the graph.
            necessary_nodes = set()
            for input_name in iter(inputs):
                if self.graph.has_node(input_name):
                    necessary_nodes |= nx.descendants(self.graph, input_name)

        else:

            # If the caller requested a subset of outputs, find any nodes that
            # are made unecessary because we were provided with an input that's
            # deeper into the network graph.  Ignore input names that aren't
            # in the graph.
            unnecessary_nodes = set()
            for input_name in iter(inputs):
                if self.graph.has_node(input_name):
                    unnecessary_nodes |= nx.ancestors(self.graph, input_name)

            # Find the nodes we need to be able to compute the requested
            # outputs.  Raise an exception if a requested output doesn't
            # exist in the graph.
            necessary_nodes = set()
            for output_name in outputs:
                if not self.graph.has_node(output_name):
                    raise ValueError("graphkit graph does not have an output "
                                     "node named %s" % output_name)
                necessary_nodes |= nx.ancestors(self.graph, output_name)

            # Get rid of the unnecessary nodes from the set of necessary ones.
            necessary_nodes -= unnecessary_nodes

        # Return an ordered list of the needed steps.
        return [step for step in self.steps if step in necessary_nodes]
项目:revex    作者:lucaswiman    | 项目源码 | 文件源码
def longest_string(self):  # type: () -> String
        """
        Returns an example of a maximally long string recognized by this DFA.

        If the language is infinite, raises InfiniteLanguageError.
        If the language is empty, raises EmptyLanguageError.

        The algorithm is similar to the one described in these lecture notes
        for deciding whether a language is finite:
        http://math.uaa.alaska.edu/~afkjm/cs351/handouts/non-regular.pdf
        """

        # Compute what we're calling the "acceptable subgraph" by restricting to
        # states which are (1) descendants of a start state, and (2) ancestors of
        # an accepting state. These two properties imply that there is at least
        # one walk between these two states, corresponding to a string present in
        # the language.
        acceptable_subgraph = self._acceptable_subgraph
        if len(acceptable_subgraph.node) == 0:
            # If this graph is _empty_, then the language is empty.
            raise EmptyLanguageError()

        # Otherwise, we try to find the longest path in it. Internally, networkx
        # does this by topologically sorting the graph (which only works if it's
        # a DAG), then using the sorted graph to construct the longest path in
        # linear time.
        try:
            longest_path = nx.algorithms.dag.dag_longest_path(
                acceptable_subgraph)
        except nx.NetworkXUnfeasible:
            # If a topological sort is not possible, this means there is a
            # cycle, and the recognized language is infinite. In this case,
            # nx raises ``nx.NetworkXUnfeasible``.
            raise InfiniteLanguageError()

        # To show that the longest path must originate at the start node,
        # consider 3 cases for the position of s in a longest path P from u to v:
        #
        # (a) At the beginning. Done; this is what we were seeking to prove.
        # (b) On the path, but not at the beginning. In this case, u is
        #     reachable from s (by property (1) above), and s in reachable from
        #     u (since s is on a path from u to v). This means the graph
        #     contains a cycle, which contradicts that we've constructed a
        #     topological sort on it.
        # (c) Disjoint from s. Let P' be a path connecting s to u (which must
        #     exist by property (1)). If this path contains a vertex u'!=u in P,
        #     then P ? P' contains a cycle (from u to u' on P and from u' to u
        #     on P'), which is a contradiction. But then P ? P' is a path, which
        #     contains at least one more vertex than P (in particular, s), and
        #     so is a longer path, which contradicts the maximality assumption.

        chars = []
        for state1, state2 in zip(longest_path, longest_path[1:]):
            edges = self.succ[state1][state2]
            chars.append(next(six.itervalues(edges))['transition'])
        return type(self.alphabet[0])().join(chars)