Python networkx 模块,simple_cycles() 实例源码

我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用networkx.simple_cycles()

项目:siggi    作者:rieck    | 项目源码 | 文件源码
def bag_of_elementary_cycles(graph):
    """ Bag of elementary cycles """

    bag = {}
    for cycle in nx.simple_cycles(graph):
        ns = map(lambda x: node_label(graph.node[x]), cycle)

        # Determine smallest label and rotate cycle
        i = min(enumerate(ns), key=lambda x: x[1])[0]
        ns.extend(ns[:i])
        ns[:i] = []

        label = '-'.join(ns)
        if label not in bag:
            bag[label] = 0
        bag[label] += 1

    return bag
项目:PedWorks    作者:BrnCPrz    | 项目源码 | 文件源码
def cycle_detect(G):
    # G = nx.DiGraph object
    cycle_list = list(nx.simple_cycles(G))
    print '\n\tCycles detected for individuals: '
    for i in cycle_list:
        print '\t', i[0]

    rmCyNode = raw_input('\n\t> Remove nodes? (y/n): ')
    print rmCyNode
    if rmCyNode == 'y':
        for i in cycle_list:
            G.remove_node(i[0])
        print '\n\tNodes removed with success.'
        print '\n\tProceeding with the algorithm.'
    elif rmCyNode == 'n':
        print '\n\tCorrect pedigree errors before proceeding..'
        print '\n\tClosing PedWork.py session...'
        print '\n\tExit > OK!'
        sys.exit()
    else:
        print '\n\tPlease enter y/n.'
        sys.exit()

    return G
项目:gym-kidney    作者:camoy    | 项目源码 | 文件源码
def embed(self, G, rng):
        if G.order() < self.cycle_length:
            return np.array([0.0], dtype = "f")

        succ = 0
        max_cycle = sp.binom(G.order(), self.cycle_length)

        for _ in range(self.sample_size):
            us = rng.choice(G.nodes(), self.cycle_length)
            H = G.subgraph(us.tolist())

            for c in nx.simple_cycles(H):
                if len(c) == self.cycle_length:
                    succ += 1
                    break

        val = [max_cycle * (succ / self.sample_size)]
        return np.array(val, dtype = "f")
项目:gym-kidney    作者:camoy    | 项目源码 | 文件源码
def embed(self, G, rng):
        if G.order() < self.cycle_length:
            return np.array([0.0], dtype = "f")

        succ, samples = 0, 0
        max_cycle = sp.binom(G.order(), self.cycle_length)

        for _ in range(self.sample_cap):
            us = rng.choice(G.nodes(), self.cycle_length)
            H = G.subgraph(us.tolist())
            samples += 1

            for c in nx.simple_cycles(H):
                if len(c) == self.cycle_length:
                    succ += 1
                    break
            if succ >= self.successes:
                break

        return np.array([max_cycle * (succ / samples)], dtype = "f")
项目:IDAPython-Scripts    作者:razygon    | 项目源码 | 文件源码
def _feature_loopcount(self, f_ea):
        '''
        Generate a block graph, then get loopcount via DiGraph.number_of_selfloops()

        prior feature: Null
        '''
        DEBUG_PRINT( 'feature_loopcount' + hex(f_ea))
        loopcount = 0
        mloops = set()
#         loops = map(lambda loop: sorted(loop),nx.simple_cycles(self.ftable["dg"]))
#         sorted too slow, use min instead
        self.loops = {}
        self.loops = simple_cycles(self.ftable["dg"])
#         if loops is None:
#             return 0
# #         mloops = map(lambda loop : min(loop), loops)
# #         loopcount = len(sets(mloops))
#         for loop in loops:
#             mloop = min(loop)
#             mloops.add(mloop)
#         loopcount = len(mloops)
        loopcount = len(self.loops)
        return loopcount
项目:skymod    作者:DelusionalLogic    | 项目源码 | 文件源码
def expand(self):
        assert(self.state == TransactionState.INIT)

        (expanded, missing) = self._expand_depedencies(
            self.local_repo,
            self.repo,
            self.targets,
            exclude=self.removes
        )
        if len(missing) > 0:
            raise MissingDependencyError(missing)

        self.depend_G = super().packages_to_graph(expanded)

        try:
            cycle = next(nx.simple_cycles(self.depend_G))
        except StopIteration:
            pass
        else:
            raise TransactionCycleError(cycle)

        self._sort_deps_to_targets()

        # Find all the packages that are upgrades rather than installs
        for package in expanded:
            if not package.is_local:
                # Search for just the name to check if we have any version
                # locally
                q = Query(package.name)
                local_package = self.local_repo.find_package(q)
                # If we have something we want to remove it first, we call that
                # an "upgrade"
                if local_package is None:
                    continue
                self.removes.append(local_package)

        conflicts = super()._find_conflicts(self.installs, self.local_repo)
        if len(conflicts) > 0:
            # There were at least some conflicts
            raise ConflictError(conflicts)
        self.state = TransactionState.EXPANDED
项目:trading_package    作者:abrahamchaibi    | 项目源码 | 文件源码
def get_cycles_by_value(self, edge_type: EdgeType, quote_type: QuoteType) -> Dict[float, List[str]]:
        dg = self.get_network(NetworkType.price, edge_type, quote_type)
        weights = get_edge_attributes(dg, 'weight')
        cycle_vals = {}
        for cycle in simple_cycles(dg):
            # sort the currencies in cycle for long term sanity
            best_curr = max(cycle, key=lambda x: Currency[x].value)
            best_curr_ind = cycle.index(best_curr)
            cycle = [cycle[best_curr_ind]] + cycle[(best_curr_ind + 1):] + cycle[:best_curr_ind]
            cycle.append(cycle[0])
            prodw = [float(weights[(cycle[i], cycle[i + 1])]) for i in range(len(cycle) - 1)]
            prodw = prod(prodw)
            cycle_vals[prodw] = cycle
        return cycle_vals
项目:idascripts    作者:ctfhacker    | 项目源码 | 文件源码
def loops(func):
    '''Return number of loops at function ``func``'''
    import networkx as nx
    fn = by(func)
    g = blocks.nx(fn)
    return len(list(nx.simple_cycles(g)))

# FIXME: document this
#def refs(func, member):
#    xl, fn = idaapi.xreflist_t(), by(func)
#    idaapi.build_stkvar_xrefs(xl, fn, member.ptr)
#    x.ea, x.opnum, x.type
#    ref_types = {
#        0  : 'Data_Unknown',
#        1  : 'Data_Offset',
#        2  : 'Data_Write',
#        3  : 'Data_Read',
#        4  : 'Data_Text',
#        5  : 'Data_Informational',
#        16 : 'Code_Far_Call',
#        17 : 'Code_Near_Call',
#        18 : 'Code_Far_Jump',
#        19 : 'Code_Near_Jump',
#        20 : 'Code_User',
#        21 : 'Ordinary_Flow'
#    }
#    return [(x.ea,x.opnum) for x in xl]
项目:idascripts    作者:ctfhacker    | 项目源码 | 文件源码
def loops(func):
    '''Return number of loops at function ``func``'''
    import networkx as nx
    fn = by(func)
    g = blocks.nx(fn)
    return len(list(nx.simple_cycles(g)))

# FIXME: document this
#def refs(func, member):
#    xl, fn = idaapi.xreflist_t(), by(func)
#    idaapi.build_stkvar_xrefs(xl, fn, member.ptr)
#    x.ea, x.opnum, x.type
#    ref_types = {
#        0  : 'Data_Unknown',
#        1  : 'Data_Offset',
#        2  : 'Data_Write',
#        3  : 'Data_Read',
#        4  : 'Data_Text',
#        5  : 'Data_Informational',
#        16 : 'Code_Far_Call',
#        17 : 'Code_Near_Call',
#        18 : 'Code_Far_Jump',
#        19 : 'Code_Near_Jump',
#        20 : 'Code_User',
#        21 : 'Ordinary_Flow'
#    }
#    return [(x.ea,x.opnum) for x in xl]
项目:idascripts    作者:ctfhacker    | 项目源码 | 文件源码
def loops(func):
    '''Return number of loops at function ``func``'''
    import networkx as nx
    fn = by(func)
    g = blocks.nx(fn)
    return len(list(nx.simple_cycles(g)))

# FIXME: document this
#def refs(func, member):
#    xl, fn = idaapi.xreflist_t(), by(func)
#    idaapi.build_stkvar_xrefs(xl, fn, member.ptr)
#    x.ea, x.opnum, x.type
#    ref_types = {
#        0  : 'Data_Unknown',
#        1  : 'Data_Offset',
#        2  : 'Data_Write',
#        3  : 'Data_Read',
#        4  : 'Data_Text',
#        5  : 'Data_Informational',
#        16 : 'Code_Far_Call',
#        17 : 'Code_Near_Call',
#        18 : 'Code_Far_Jump',
#        19 : 'Code_Near_Jump',
#        20 : 'Code_User',
#        21 : 'Ordinary_Flow'
#    }
#    return [(x.ea,x.opnum) for x in xl]
项目:fuzzer    作者:shellphish    | 项目源码 | 文件源码
def _remove_cycles(self):
        """
        Really hacky way to remove cycles in hierarchies (wtf).
        """

        G = self.make_graph()
        cycles = list(networkx.simple_cycles(G))
        if not cycles:
            return False
        else:
            cycles[0][0].looped = True
            cycles[0][0].sources[:] = [ ]
            return True
项目:qtk-python    作者:gouthambs    | 项目源码 | 文件源码
def _compile_node_creator_list(self):
        graph = self._graph
        cycles = list(nx.simple_cycles(graph))
        if len(cycles):
            raise ValueError("Found cycles in dependencies "+str(cycles))
        else:
            nodes = list(nx.dfs_postorder_nodes(graph))
            nodes.remove(self._ROOT)  # exclude root
            creators = []
            for n in nodes:
                creator = graph.node[n].get('creator')
                creators.append((n, creator))
            return creators
项目:k-evo    作者:offbit    | 项目源码 | 文件源码
def valid_graph(self, g):
        cycles = [a for a in nx.simple_cycles(g)]
        if len(cycles) > 0:
            return False
        return True
项目:capillary    作者:celery-capillary    | 项目源码 | 文件源码
def build_tree(self, tasks):
        """Accepts any number of tasks as returned by _get_pipeline.

        :param tasks: dictionary of str:info where str is the name of the task, info is from the registry

        :returns: Graph containing each node connected by dependencies, "after: ALL" nodes will be ignored
        :rtype: networkx.DiGraph

        :raises: :exc:`DependencyError`
        """
        all_task_names = [task for tasks_subset in self.registry.values() for task in tasks_subset]

        # Find dependencies - directed graph of node names
        tree = nx.DiGraph()

        # Add nodes
        for name, info in tasks.items():
            # TODO: doing this twice sucks
            if info['after'] is ALL:
                # ignore these
                continue
            tree.add_node(name, info=info)

        # Add edges
        for name, info in tasks.items():
            if info['after'] is ALL:
                # ignore these
                continue
            for req in info['after']:
                if req not in all_task_names:
                    msg = '"{}" pipeline element was not found, but it is declared as dependency of the pipeline "{}" with arguments "{}"'
                    raise DependencyError(msg.format(req, name, info))
                if req in tree:  # don't add an edge if dependency is not part of the current set of tasks
                    tree.add_edge(req, name)

        # Not as useful as it originally seemed
        # tree = prune_edges(tree)

        # Check for circular dependencies
        try:
            cycle = nx.simple_cycles(tree).next()
            raise DependencyError('Circular dependencies detected: {}'.format(cycle))
        except StopIteration:
            # Good - didn't want any cycles
            pass

        # Joins (merge multiple tasks) have more than one edge in
        # joins = [n for n, d in tree.in_degree_iter() if d > 1]

        # Don't support joins right now, one reducer at the end of the chain
        # if joins:
        #    raise DependencyError('Multiple after values not currently supported joins="{}"'.format(joins))

        # TODO - even with joins this could be a challenge
        # Can't handle "N" shapes, so check for those
        # Descendants of forks, cannot join from outside.
        return tree