Python networkx 模块,MultiDiGraph() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用networkx.MultiDiGraph()

项目:pybel    作者:pybel    | 项目源码 | 文件源码
def test_flatten_edges(self):
        g = nx.MultiDiGraph()
        g.add_edge(1, 2, key=5, attr_dict={'A': 'a', 'B': {'C': 'c', 'D': 'd'}})

        result = pybel.utils.flatten_graph_data(g)

        expected = nx.MultiDiGraph()
        expected.add_edge(1, 2, key=5, attr_dict={'A': 'a', 'B_C': 'c', 'B_D': 'd'})

        self.assertEqual(set(result.nodes()), set(expected.nodes()))

        res_edges = result.edges(keys=True)
        exp_edges = expected.edges(keys=True)
        self.assertEqual(set(res_edges), set(exp_edges))

        for u, v, k in expected.edges(keys=True):
            self.assertEqual(expected[u][v][k], result[u][v][k])
项目:pybel    作者:pybel    | 项目源码 | 文件源码
def to_graphml(graph, file):
    """Writes this graph to GraphML XML file using :func:`networkx.write_graphml`. The .graphml file extension is
    suggested so Cytoscape can recognize it.

    :param BELGraph graph: A BEL graph
    :param file file: A file or file-like object
    """
    g = nx.MultiDiGraph()

    for node, data in graph.nodes(data=True):
        g.add_node(node, json=json.dumps(data))

    for u, v, key, data in graph.edges(data=True, keys=True):
        g.add_edge(u, v, key=key, attr_dict=flatten_dict(data))

    nx.write_graphml(g, file)
项目:pybel    作者:pybel    | 项目源码 | 文件源码
def flatten_graph_data(graph):
    """Returns a new graph with flattened edge data dictionaries.

    :param nx.MultiDiGraph graph: A graph with nested edge data dictionaries
    :return: A graph with flattened edge data dictionaries
    :rtype: nx.MultiDiGraph
    """
    g = nx.MultiDiGraph(**graph.graph)

    for node, data in graph.nodes(data=True):
        g.add_node(node, data)

    for u, v, key, data in graph.edges(data=True, keys=True):
        g.add_edge(u, v, key=key, attr_dict=flatten_dict(data))

    return g
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def nx_graph(self):
        """Convert the data in a ``nodelist`` into a networkx labeled directed graph."""
        import networkx

        nx_nodelist = list(range(1, len(self.nodes)))
        nx_edgelist = [
            (n, self._hd(n), self._rel(n))
            for n in nx_nodelist if self._hd(n)
        ]
        self.nx_labels = {}
        for n in nx_nodelist:
            self.nx_labels[n] = self.nodes[n]['word']

        g = networkx.MultiDiGraph()
        g.add_nodes_from(nx_nodelist)
        g.add_edges_from(nx_edgelist)

        return g
项目:revex    作者:lucaswiman    | 项目源码 | 文件源码
def _acceptable_subgraph(self):  # type:  () -> nx.MultiDiGraph
        graph = self.as_multidigraph
        reachable_states = nx.descendants(graph, self.start) | {self.start}
        graph = graph.subgraph(reachable_states)
        reachable_accepting_states = reachable_states & {
            node for node in graph.node if graph.node[node]['accepting']
        }

        # Add a "sink" node with an in-edge from every accepting state. This is
        # is solely done because the networkx API makes it easier to find the
        # ancestor of a node than a set of nodes.
        sink = object()
        graph.add_node(sink)
        for state in reachable_accepting_states:
            graph.add_edge(state, sink)

        acceptable_sates = nx.ancestors(graph, sink)
        return graph.subgraph(acceptable_sates)
项目:revex    作者:lucaswiman    | 项目源码 | 文件源码
def live_subgraph(self):  # type: () -> nx.MultiDiGraph
        """
        Returns the graph of "live" states for this graph, i.e. the start state
        together with states that may be involved in positively matching a string
        (reachable from the start node and an ancestor of an accepting node).

        This is intended for display purposes, only showing the paths which
        might lead to an accepting state, or just the start state if no such
        paths exist.
        """
        graph = self.as_multidigraph
        accepting_states = {
            node for node in graph.node if graph.node[node]['accepting']
        }

        # Add a "sink" node with an in-edge from every accepting state. This is
        # is solely done because the networkx API makes it easier to find the
        # ancestor of a node than a set of nodes.
        sink = object()
        graph.add_node(sink)
        for state in accepting_states:
            graph.add_edge(state, sink)

        live_states = {self.start} | (nx.ancestors(graph, sink) & nx.descendants(graph, self.start))
        return graph.subgraph(live_states)
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda    作者:SignalMedia    | 项目源码 | 文件源码
def nx_graph(self):
        """Convert the data in a ``nodelist`` into a networkx labeled directed graph."""
        import networkx

        nx_nodelist = list(range(1, len(self.nodes)))
        nx_edgelist = [
            (n, self._hd(n), self._rel(n))
            for n in nx_nodelist if self._hd(n)
        ]
        self.nx_labels = {}
        for n in nx_nodelist:
            self.nx_labels[n] = self.nodes[n]['word']

        g = networkx.MultiDiGraph()
        g.add_nodes_from(nx_nodelist)
        g.add_edges_from(nx_edgelist)

        return g
项目:parsemis_wrapper    作者:tomkdickinson    | 项目源码 | 文件源码
def test_multidigraph_is_subgraph_without_labels(self):
        G = nx.MultiDiGraph()
        G.add_edges_from(
            [
                (1, 2),
                (2, 3),
                (3, 4),
                (2, 4)
            ]
        )

        valid_subgraph_a = nx.MultiDiGraph()
        valid_subgraph_a.add_edge(1, 2)

        valid_subgraph_b = nx.MultiDiGraph()
        valid_subgraph_b.add_edge(3, 4)

        invalid_subgraph_a = nx.MultiDiGraph()
        invalid_subgraph_a.add_edge(1, 4)

        self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_a))
        self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_b))

        self.assertFalse(ParsemisMiner.is_subgraph(G, invalid_subgraph_a))
项目:neighborhood_mood_aws    作者:jarrellmark    | 项目源码 | 文件源码
def nx_graph(self):
        """Convert the data in a ``nodelist`` into a networkx labeled directed graph."""
        import networkx

        nx_nodelist = list(range(1, len(self.nodes)))
        nx_edgelist = [
            (n, self._hd(n), self._rel(n))
            for n in nx_nodelist if self._hd(n)
        ]
        self.nx_labels = {}
        for n in nx_nodelist:
            self.nx_labels[n] = self.nodes[n]['word']

        g = networkx.MultiDiGraph()
        g.add_nodes_from(nx_nodelist)
        g.add_edges_from(nx_edgelist)

        return g
项目:gtfspy    作者:CxAalto    | 项目源码 | 文件源码
def combined_stop_to_stop_transit_network(gtfs, start_time_ut=None, end_time_ut=None):
    """
    Compute stop-to-stop networks for all travel modes and combine them into a single network.
    The modes of transport are encoded to a single network.
    The network consists of multiple links corresponding to each travel mode.
    Walk mode is not included.

    Parameters
    ----------
    gtfs: gtfspy.GTFS

    Returns
    -------
    net: networkx.MultiDiGraph
        keys should be one of route_types.TRANSIT_ROUTE_TYPES (i.e. GTFS route_types)
    """
    multi_di_graph = networkx.MultiDiGraph()
    for route_type in route_types.TRANSIT_ROUTE_TYPES:
        graph = stop_to_stop_network_for_route_type(gtfs, route_type,
                                                    start_time_ut=start_time_ut, end_time_ut=end_time_ut)
        for from_node, to_node, data in graph.edges(data=True):
            data['route_type'] = route_type
        multi_di_graph.add_edges_from(graph.edges(data=True))
        multi_di_graph.add_nodes_from(graph.nodes(data=True))
    return multi_di_graph
项目:python_cypher    作者:zacernst    | 项目源码 | 文件源码
def main():
    # sample = ','.join(['MATCH (x:SOMECLASS {bar : "baz"',
    #                    'foo:"goo"})<-[:WHATEVER]-(:ANOTHERCLASS)',
    #                    '(y:LASTCLASS) RETURN x.foo, y'])

    # create = ('CREATE (n:SOMECLASS {foo: "bar", bar: {qux: "baz"}})'
    #           '-[e:EDGECLASS]->(m:ANOTHERCLASS) RETURN n')
    # create = 'CREATE (n:SOMECLASS {foo: "bar", qux: "baz"}) RETURN n'
    create_query = ('CREATE (n:SOMECLASS {foo: {goo: "bar"}})'
            '-[e:EDGECLASS]->(m:ANOTHERCLASS {qux: "foobar", bar: 10}) '
                    'RETURN n')
    test_query = ('MATCH (n:SOMECLASS {foo: {goo: "bar"}})-[e:EDGECLASS]->'
                  '(m:ANOTHERCLASS) WHERE '
                  'm.bar = 10 '
                  'RETURN n.foo.goo, m.qux, e')
    # atomic_facts = extract_atomic_facts(test_query)
    graph_object = nx.MultiDiGraph()
    my_parser = CypherToNetworkx()
    for i in my_parser.query(graph_object, create_query):
        pass  # a generator, we need to loop over results to run.
    for i in my_parser.query(graph_object, test_query):
        print i
项目:hate-to-hugs    作者:sdoran35    | 项目源码 | 文件源码
def nx_graph(self):
        """Convert the data in a ``nodelist`` into a networkx labeled directed graph."""
        import networkx

        nx_nodelist = list(range(1, len(self.nodes)))
        nx_edgelist = [
            (n, self._hd(n), self._rel(n))
            for n in nx_nodelist if self._hd(n)
        ]
        self.nx_labels = {}
        for n in nx_nodelist:
            self.nx_labels[n] = self.nodes[n]['word']

        g = networkx.MultiDiGraph()
        g.add_nodes_from(nx_nodelist)
        g.add_edges_from(nx_edgelist)

        return g
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def build_layer2_graph(related_extra=None):
    """Builds a graph representation of the layer 2 topology stored in the NAV
    database.

    :param related_extra Additional selection_related fields

    :returns: A MultiDiGraph of Netbox nodes, edges annotated with Interface
              model objects.

    """
    graph = nx.MultiDiGraph(name="Layer 2 topology")

    select_related = ('netbox', 'to_netbox', 'to_interface')
    if related_extra:
        select_related = select_related + related_extra

    links = Interface.objects.filter(
        to_netbox__isnull=False).select_related(*select_related)

    for link in links:
        dest = link.to_interface.netbox if link.to_interface else link.to_netbox
        graph.add_edge(link.netbox, dest, key=link)
    return graph
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_reduce_simple_case_cam(self):
        graph = nx.MultiDiGraph(name="simple case cam")
        graph.add_edge(self.switch_a, self.switch_port_a)
        graph.add_edge(self.switch_b, self.switch_port_b)
        graph.add_edge(self.switch_port_a, self.switch_b, "cam")
        graph.add_edge(self.switch_port_b, self.switch_a, "cam")
        reducer = AdjacencyReducer(graph)
        print("input:")
        print(reducer.format_connections())
        reducer.reduce()
        print("result:")
        print(reducer.format_connections())
        result = reducer.graph
        assert result.has_edge(self.switch_port_a, self.switch_port_b)
        assert result.has_edge(self.switch_port_b, self.switch_port_a)
        assert result.out_degree(self.switch_port_a) == 1
        assert result.out_degree(self.switch_port_b) == 1
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_reduce_simple_case_lldp(self):
        graph = nx.MultiDiGraph(name="simple case lldp")
        graph.add_edge(self.switch_a, self.switch_port_a)
        graph.add_edge(self.switch_b, self.switch_port_b)
        graph.add_edge(self.switch_port_a, self.switch_port_b, "lldp")
        graph.add_edge(self.switch_port_b, self.switch_port_a, "lldp")
        reducer = AdjacencyReducer(graph)
        print("input:")
        print(reducer.format_connections())
        reducer.reduce()
        print("result:")
        print(reducer.format_connections())
        result = reducer.graph
        assert result.has_edge(self.switch_port_a, self.switch_port_b)
        assert result.has_edge(self.switch_port_b, self.switch_port_a)
        assert result.out_degree(self.switch_port_a) == 1
        assert result.out_degree(self.switch_port_b) == 1
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_reduce_simple_tree_lldp(self):
        graph = nx.MultiDiGraph(name="simple tree lldp")
        graph.add_edge(self.router, self.router_port_a)
        graph.add_edge(self.router, self.router_port_b)
        graph.add_edge(self.switch_a, self.switch_port_a)
        graph.add_edge(self.switch_b, self.switch_port_b)
        graph.add_edge(self.switch_port_a, self.router_port_a, "lldp")
        graph.add_edge(self.switch_port_b, self.router_port_b, "lldp")
        graph.add_edge(self.router_port_a, self.switch_port_a, "lldp")
        graph.add_edge(self.router_port_b, self.switch_port_b, "lldp")
        reducer = AdjacencyReducer(graph)
        print("input:")
        print(reducer.format_connections())
        reducer.reduce()
        print("result:")
        print(reducer.format_connections())
        result = reducer.graph
        assert result.has_edge(self.switch_port_a, self.router_port_a)
        assert result.has_edge(self.switch_port_b, self.router_port_b)
        assert result.has_edge(self.router_port_a, self.switch_port_a)
        assert result.has_edge(self.router_port_b, self.switch_port_b)
        assert result.out_degree(self.switch_port_a) == 1
        assert result.out_degree(self.switch_port_b) == 1
        assert result.out_degree(self.router_port_a) == 1
        assert result.out_degree(self.router_port_b) == 1
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def test_no_return_path(self):
        graph = nx.MultiDiGraph()
        graph.add_edge(self.switch_a, self.switch_port_a)
        graph.add_edge(self.switch_b, self.switch_port_b)
        graph.add_edge(self.switch_port_a, self.switch_b, "cam")
        reducer = AdjacencyReducer(graph)
        print("input:")
        print(reducer.format_connections())
        reducer.reduce()
        print("result:")
        print(reducer.format_connections())
        result = reducer.graph
        assert result.has_edge(self.switch_port_a, self.switch_b)
        assert not result.has_edge(self.switch_port_b, self.switch_port_a)
        assert result.out_degree(self.switch_port_a) == 1
        assert self.switch_port_b not in result
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def nx_graph(self):
        """Convert the data in a ``nodelist`` into a networkx labeled directed graph."""
        import networkx

        nx_nodelist = list(range(1, len(self.nodes)))
        nx_edgelist = [
            (n, self._hd(n), self._rel(n))
            for n in nx_nodelist if self._hd(n)
        ]
        self.nx_labels = {}
        for n in nx_nodelist:
            self.nx_labels[n] = self.nodes[n]['word']

        g = networkx.MultiDiGraph()
        g.add_nodes_from(nx_nodelist)
        g.add_edges_from(nx_edgelist)

        return g
项目:beepboop    作者:nicolehe    | 项目源码 | 文件源码
def nx_graph(self):
        """Convert the data in a ``nodelist`` into a networkx labeled directed graph."""
        import networkx

        nx_nodelist = list(range(1, len(self.nodes)))
        nx_edgelist = [
            (n, self._hd(n), self._rel(n))
            for n in nx_nodelist if self._hd(n)
        ]
        self.nx_labels = {}
        for n in nx_nodelist:
            self.nx_labels[n] = self.nodes[n]['word']

        g = networkx.MultiDiGraph()
        g.add_nodes_from(nx_nodelist)
        g.add_edges_from(nx_edgelist)

        return g
项目:kind2anki    作者:prz3m    | 项目源码 | 文件源码
def nx_graph(self):
        """Convert the data in a ``nodelist`` into a networkx labeled directed graph."""
        import networkx

        nx_nodelist = list(range(1, len(self.nodes)))
        nx_edgelist = [
            (n, self._hd(n), self._rel(n))
            for n in nx_nodelist if self._hd(n)
        ]
        self.nx_labels = {}
        for n in nx_nodelist:
            self.nx_labels[n] = self.nodes[n]['word']

        g = networkx.MultiDiGraph()
        g.add_nodes_from(nx_nodelist)
        g.add_edges_from(nx_edgelist)

        return g
项目:but_sentiment    作者:MixedEmotions    | 项目源码 | 文件源码
def nx_graph(self):
        """Convert the data in a ``nodelist`` into a networkx labeled directed graph."""
        import networkx

        nx_nodelist = list(range(1, len(self.nodes)))
        nx_edgelist = [
            (n, self._hd(n), self._rel(n))
            for n in nx_nodelist if self._hd(n)
        ]
        self.nx_labels = {}
        for n in nx_nodelist:
            self.nx_labels[n] = self.nodes[n]['word']

        g = networkx.MultiDiGraph()
        g.add_nodes_from(nx_nodelist)
        g.add_edges_from(nx_edgelist)

        return g
项目:isambard    作者:woolfson-group    | 项目源码 | 文件源码
def filter_graph(g, cutoff=7.0, min_kihs=2):
        """ Get subgraph formed from edges that have max_kh_distance < cutoff.

        Parameters
        ----------
        g : MultiDiGraph representing KIHs
            g is the output from graph_from_protein
        cutoff : float
            Socket cutoff in Angstroms.
            Default is 7.0.
        min_kihs : int
            Minimum number of KIHs shared between all pairs of connected nodes in the graph.

        Returns
        -------
        networkx.MultiDigraph
            subgraph formed from edges that have max_kh_distance < cutoff.
        """
        edge_list = [e for e in g.edges(keys=True, data=True) if e[3]['kih'].max_kh_distance <= cutoff]
        if min_kihs > 0:
            c = Counter([(e[0], e[1]) for e in edge_list])
            # list of nodes that share > min_kihs edges with at least one other node.
            node_list = set(list(itertools.chain.from_iterable([k for k, v in c.items() if v > min_kihs])))
            edge_list = [e for e in edge_list if (e[0] in node_list) and (e[1] in node_list)]
        return networkx.MultiDiGraph(edge_list)
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def __init__(self, root_path):
        self.code_graph = nx.MultiDiGraph()
        self.metrics = {}
        self.root_path = Path(root_path)
        self.root_arch_ids = []
        self.entity_kinds = SortedSet()
        self.ref_kinds = SortedSet()
项目:geomdn    作者:afshinrahimi    | 项目源码 | 文件源码
def projected_graph(B, nodes, multigraph=False):
    if B.is_multigraph():
        raise nx.NetworkXError("not defined for multigraphs")
    if B.is_directed():
        directed=True
        if multigraph:
            G=nx.MultiDiGraph()
        else:
            G=nx.DiGraph()
    else:
        directed=False
        if multigraph:
            G=nx.MultiGraph()
        else:
            G=nx.Graph()
    G.graph.update(B.graph)
    G.add_nodes_from((n,B.node[n]) for n in nodes)
    i = 0
    nodes = set(nodes)
    tenpercent = len(nodes) / 10
    for u in nodes:
        if i % tenpercent == 0:
            logging.info(str(10 * i / tenpercent) + "%")
        i += 1  
        nbrs2=set((v for nbr in B[u] for v in B[nbr])) & nodes - set([u])
        if multigraph:
            for n in nbrs2:
                if directed:
                    links=set(B[u]) & set(B.pred[n])
                else:
                    links=set(B[u]) & set(B[n])
                for l in links:
                    if not G.has_edge(u,n,l):
                        G.add_edge(u,n,key=l)
        else:
            G.add_edges_from((u,n) for n in nbrs2)
    return G
项目:pybel    作者:pybel    | 项目源码 | 文件源码
def test_dict_matches_graph(self):
        g = nx.MultiDiGraph()

        g.add_node(1)
        g.add_node(2)
        g.add_edge(1, 2, relation='yup')
        g.add_edge(1, 2, relation='nope')

        d = {'relation': 'yup'}

        self.assertTrue(any_subdict_matches(g.edge[1][2], d))
项目:ccc_helper    作者:TimothyZhang    | 项目源码 | 文件源码
def create_project_graph(project):
    """
    :param Project project:
    :rtype: nx.MultiDiGraph
    """
    g = nx.MultiDiGraph()
    assets = list(project.iterate_assets())
    add_assets_to_graph(g, assets)
    return g
项目:home-assistant-graph    作者:happyleavesaoc    | 项目源码 | 文件源码
def new_graph():
    """Get a clean graph."""
    graph = networkx.MultiDiGraph()
    graph.graph = ATTR_GRAPH
    return graph
项目:qiskit-sdk-py    作者:QISKit    | 项目源码 | 文件源码
def __init__(self):
        """Create an empty circuit."""
        # Map from a wire's name (reg,idx) to a Bool that is True if the
        # wire is a classical bit and False if the wire is a qubit.
        self.wire_type = {}

        # Map from wire names (reg,idx) to input nodes of the graph
        self.input_map = {}

        # Map from wire names (reg,idx) to output nodes of the graph
        self.output_map = {}

        # Running count of the total number of nodes
        self.node_counter = 0

        # Map of named operations in this circuit and their signatures.
        # The signature is an integer tuple (nq,nc,np) specifying the
        # number of input qubits, input bits, and real parameters.
        # The definition is external to the circuit object.
        self.basis = {}

        # Directed multigraph whose nodes are inputs, outputs, or operations.
        # Operation nodes have equal in- and out-degrees and carry
        # additional data about the operation, including the argument order
        # and parameter values.
        # Input nodes have out-degree 1 and output nodes have in-degree 1.
        # Edges carry wire labels (reg,idx) and each operation has
        # corresponding in- and out-edges with the same wire labels.
        self.multi_graph = nx.MultiDiGraph()

        # Map of qregs to sizes
        self.qregs = {}

        # Map of cregs to sizes
        self.cregs = {}

        # Map of user defined gates to ast nodes defining them
        self.gates = {}

        # Output precision for printing floats
        self.prec = 10
项目:roadmap-processing    作者:aicenter    | 项目源码 | 文件源码
def compute_centrality(graph: nx.MultiDiGraph, data: geojson.feature.FeatureCollection, edge_map: Dict):
    for item in data['features']:
        edge = edge_map[item['properties']['id']]
        from_degree = graph.degree(edge[0])
        to_degree = graph.degree(edge[1])
        item['properties']["from_degree"] = from_degree
        item['properties']["to_degree"] = to_degree
项目:roadmap-processing    作者:aicenter    | 项目源码 | 文件源码
def _create_edge_map(graph: nx.MultiDiGraph) -> Dict:
    edge_map = {}
    for edge in graph.edges():
        edge_map[graph[edge[0]][edge[1]][0]["id"]] = edge

    return edge_map
项目:roadmap-processing    作者:aicenter    | 项目源码 | 文件源码
def graph_multi_test(graph: nx.MultiDiGraph):
    for edge in graph.edges():
        if len(graph[edge[0]][edge[1]]) > 1:
            a=1
项目:roadmap-processing    作者:aicenter    | 项目源码 | 文件源码
def _load_graph(json_dict: dict) -> nx.MultiDiGraph:
    g = nx.MultiDiGraph()
    for item in json_dict['features']:
        coord = item['geometry']['coordinates']
        coord_u = get_node(coord[0])
        coord_v = get_node(coord[-1])
        if coord_u != coord_v or len(coord) != 2:  # prune loops without any purpose, save loops like traffic roundabout
            lanes = item['properties']['lanes']
            g.add_edge(coord_u, coord_v, id=item['properties']['id'], others=[[]], lanes=lanes)
    return g
项目:roadmap-processing    作者:aicenter    | 项目源码 | 文件源码
def simplify_graph(g: nx.MultiDiGraph, check_lanes):
    for n, _ in list(g.adjacency()):
        if g.out_degree(n) == 1 and g.in_degree(n) == 1:  # oneways
            simplify_oneways(n, g, check_lanes)

    for n, _ in list(g.adjacency()):
        if g.out_degree(n) == 2 and g.in_degree(n) == 2:  # both directions in highway
            simplify_twoways(n, g, check_lanes)
项目:roadmap-processing    作者:aicenter    | 项目源码 | 文件源码
def load_graph(json_dict):
    g = nx.MultiDiGraph()
    for item in json_dict['features']:
        coord = item['geometry']['coordinates']
        coord_u = get_node(coord[0])
        coord_v = get_node(coord[-1])
        if coord_u != coord_v or len(coord) != 2:  # prune loops without any purpose, save loops like traffic roundabout
            lanes = item['properties']['lanes']
            data = item['geometry']['coordinates'][1:-1]
            if len(data) == 0:
                data = []
            g.add_edge(coord_u, coord_v, id=item['properties']['id'], others=data, lanes=lanes)
    return g
项目:roadmap-processing    作者:aicenter    | 项目源码 | 文件源码
def load_graph(data: geojson.feature.FeatureCollection) -> nx.MultiDiGraph:
    g = nx.MultiDiGraph()
    print_info("Creating networkx graph from geojson")
    for item in tqdm(data['features'], desc="processing features"):
        coord = item['geometry']['coordinates']
        coord_u = _get_node(coord[0])
        coord_v = _get_node(coord[-1])
        g.add_edge(coord_u, coord_v, id=item['properties']['id'])
    return g
项目:roadmap-processing    作者:aicenter    | 项目源码 | 文件源码
def load_graph(json_dict):
    g = nx.MultiDiGraph()
    for item in json_dict['features']:
        coord = item['geometry']['coordinates']
        coord_u = get_node(coord[0])
        coord_v = get_node(coord[-1])
        g.add_edge(coord_u, coord_v, id=item['properties']['id'])
    return g
项目:juicer    作者:eubr-bigsea    | 项目源码 | 文件源码
def __init__(self, workflow_data, config):

        self.config = config
        # Initialize
        self.graph = nx.MultiDiGraph()

        # Workflow dictionary
        self.workflow = workflow_data

        # Construct graph
        self._build_initial_workflow_graph()
        self._build_privacy_restrictions()

        # Topological sorted tasks according to their dependencies
        self.sorted_tasks = []

        # Spark or COMPSs
        self.platform = workflow_data.get('platform', {}).get('slug', 'spark')

        # Verify null edges to topological_sorted_tasks
        if self.is_there_null_target_id_tasks() \
                and self.is_there_null_source_id_tasks():
            self.sorted_tasks = self.get_topological_sorted_tasks()
        else:
            raise AttributeError(
                _("Port '{}/{}' must be informed for operation{}").format(
                    self.WORKFLOW_GRAPH_SOURCE_ID_PARAM,
                    self.WORKFLOW_GRAPH_TARGET_ID_PARAM,
                    self.__class__))
项目:cobol-sharp    作者:petli    | 项目源码 | 文件源码
def __init__(self, debug=False):
        self.graph = nx.MultiDiGraph()
        self._debug = debug
项目:NetPower_TestBed    作者:Vignesh2208    | 项目源码 | 文件源码
def get_mdg(self):

        mdg = nx.MultiDiGraph(self.graph)

        for n in self.graph:
            node_type = self.get_node_type(n)

            # Remove all host nodes
            if node_type == "host":
                mdg.remove_node(n)

        return mdg
项目:revex    作者:lucaswiman    | 项目源码 | 文件源码
def as_multidigraph(self):  # type: () -> nx.MultiDiGraph
        """
        Constructs a MultiDiGraph that is a copy of self.

        This is a bit of a hack, but allows some useful methods like .subgraph()
        to work correctly.
        """
        graph = nx.MultiDiGraph()
        graph.add_nodes_from(self.nodes(data=True))
        graph.add_edges_from(self.edges(data=True))
        return graph
项目:Android-Repackaged-App-Detection-System    作者:M157q    | 项目源码 | 文件源码
def parse_saaf_json(filename):
    with open(filename) as json_file:
        saaf_json = json.load(json_file)
    #print saaf_json

    G = nx.MultiDiGraph()
    #graphs = []

    i = 0
    for invoke, invoke_info in saaf_json.iteritems():
        for backtrack in invoke_info:
            j = 0
            for node in backtrack:
                node = dict((k.lower(), v) for k, v in node.iteritems())
                node['parent']  = node['parent'] + i
                node['nodeid']  = node['nodeid'] + i
                node['invoke']  = invoke

                new_node = node['nodeid']
                new_edge = (node['parent'], node['nodeid'])

                if not G.has_node(new_node):
                    G.add_node(new_node, attr_dict = node)

                if node['parent'] == i-1:
                    node['parent'] = i
                elif not G.has_edge(*new_edge):
                    G.add_edge(*new_edge)

                j += 1
            i += j

            #print i
            #print G.edges()
            #print json.dumps(json_graph.node_link_data(G), sort_keys = True, indent = 4)
            #print
        #graphs.append(G)

    #return graphs
    return G
项目:osmnx    作者:gboeing    | 项目源码 | 文件源码
def gdfs_to_graph(gdf_nodes, gdf_edges):
    """
    Convert node and edge GeoDataFrames into a graph

    Parameters
    ----------
    gdf_nodes : GeoDataFrame
    gdf_edges : GeoDataFrame

    Returns
    -------
    networkx multidigraph
    """

    G = nx.MultiDiGraph()
    G.graph['crs'] = gdf_nodes.crs
    G.graph['name'] = gdf_nodes.gdf_name.rstrip('_nodes')

    # add the nodes and their attributes to the graph
    G.add_nodes_from(gdf_nodes.index)
    attributes = gdf_nodes.to_dict()
    for attribute_name in gdf_nodes.columns:
        # only add this attribute to nodes which have a non-null value for it
        attribute_values = {k:v for k, v in attributes[attribute_name].items() if pd.notnull(v)}
        nx.set_node_attributes(G, name=attribute_name, values=attribute_values)

    # add the edges and attributes that are not u, v, key (as they're added
    # separately) or null
    for _, row in gdf_edges.iterrows():
        attrs = {}
        for label, value in row.iteritems():
            if (label not in ['u', 'v', 'key']) and (isinstance(value, list) or pd.notnull(value)):
                attrs[label] = value
        G.add_edge(u=row['u'], v=row['v'], key=row['key'], **attrs)

    return G
项目:parsemis_wrapper    作者:tomkdickinson    | 项目源码 | 文件源码
def get_label_from_edge(g, edge, attribute_name='label'):
        edge_attributes = g.get_edge_data(edge[0], edge[1])
        if edge_attributes is None and nx.is_directed(g):
            edge_attributes = g.get_edge_data(edge[1], edge[0])

        labels = []
        if type(g) == nx.MultiDiGraph or type(g) == nx.MultiGraph:
            for index in edge_attributes:
                if attribute_name in edge_attributes[index]:
                    labels.append(edge_attributes[index][attribute_name])
        else:
            if attribute_name in edge_attributes:
                labels.append(edge_attributes[attribute_name])

        return labels
项目:parsemis_wrapper    作者:tomkdickinson    | 项目源码 | 文件源码
def test_multidigraph_is_subgraph_with_labels(self):
        G = nx.MultiDiGraph()
        G.add_edges_from(
            [
                (1, 2, {'label': 'a'}),
                (1, 2, {'label': 'f'}),
                (2, 3, {'label': 'c'}),
                (3, 4, {'label': 'd'}),
                (2, 4, {'label': 'b'})
            ]
        )

        valid_subgraph_a = nx.MultiDiGraph()
        valid_subgraph_a.add_edge(1, 2, label='a')

        valid_subgraph_b = nx.MultiDiGraph()
        valid_subgraph_b.add_edge(1, 2, label='f')

        valid_subgraph_c = nx.MultiDiGraph()
        valid_subgraph_c.add_edge(3, 4, label='d')

        invalid_subgraph_a = nx.MultiDiGraph()
        invalid_subgraph_a.add_edge(1, 2, label='b')

        invalid_subgraph_b = nx.MultiDiGraph()
        invalid_subgraph_b.add_edge(2, 1, label='a')

        invalid_subgraph_c = nx.MultiDiGraph()
        invalid_subgraph_c.add_edge(1, 4, label='a')

        self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_a))
        self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_b))
        self.assertTrue(ParsemisMiner.is_subgraph(G, valid_subgraph_c))

        self.assertFalse(ParsemisMiner.is_subgraph(G, invalid_subgraph_a))
        self.assertFalse(ParsemisMiner.is_subgraph(G, invalid_subgraph_b))
        self.assertFalse(ParsemisMiner.is_subgraph(G, invalid_subgraph_c))
项目:parsemis_wrapper    作者:tomkdickinson    | 项目源码 | 文件源码
def test_frequent_graph(self):
        G = nx.MultiDiGraph()
        G.add_edge(1, 2, label='a')
        G.add_edge(1, 2, label='b')

        fg = FrequentGraph(G, [])
        print("Name: %s" % fg.to_string())
项目:analyse_website_dns    作者:mrcheng0910    | 项目源码 | 文件源码
def chess_pgn_graph(pgn_file="chess_masters_WCC.pgn.bz2"):
    """Read chess games in pgn format in pgn_file.

    Filenames ending in .gz or .bz2 will be uncompressed.

    Return the MultiDiGraph of players connected by a chess game.
    Edges contain game data in a dict.

    """
    import bz2
    G=nx.MultiDiGraph()
    game={}
    datafile = bz2.BZ2File(pgn_file)
    lines = (line.decode().rstrip('\r\n') for line in datafile)
    for line in lines:
        if line.startswith('['):
            tag,value=line[1:-1].split(' ',1)
            game[str(tag)]=value.strip('"')
        else:
        # empty line after tag set indicates
        # we finished reading game info
            if game:
                white=game.pop('White')
                black=game.pop('Black')
                G.add_edge(white, black, **game)
                game={}
    return G
项目:analyse_website_dns    作者:mrcheng0910    | 项目源码 | 文件源码
def chess_pgn_graph(pgn_file="chess_masters_WCC.pgn.bz2"):
    """Read chess games in pgn format in pgn_file.

    Filenames ending in .gz or .bz2 will be uncompressed.

    Return the MultiDiGraph of players connected by a chess game.
    Edges contain game data in a dict.

    """
    import bz2
    G=nx.MultiDiGraph()
    game={}
    datafile = bz2.BZ2File(pgn_file)
    lines = (line.decode().rstrip('\r\n') for line in datafile)
    for line in lines:
        if line.startswith('['):
            tag,value=line[1:-1].split(' ',1)
            game[str(tag)]=value.strip('"')
        else:
        # empty line after tag set indicates
        # we finished reading game info
            if game:
                white=game.pop('White')
                black=game.pop('Black')
                G.add_edge(white, black, **game)
                game={}
    return G
项目:gtfspy    作者:CxAalto    | 项目源码 | 文件源码
def test_combined_stop_to_stop_transit_network(self):
        multi_di_graph = networks.combined_stop_to_stop_transit_network(self.gtfs)
        self.assertIsInstance(multi_di_graph, networkx.MultiDiGraph)
        for from_node, to_node, data in multi_di_graph.edges(data=True):
            self.assertIn("route_type", data)
项目:python_cypher    作者:zacernst    | 项目源码 | 文件源码
def test_upper(self):
        """Test we can parse a CREATE... RETURN query."""
        g = nx.MultiDiGraph()
        query = 'CREATE (n:SOMECLASS) RETURN n'
        test_parser = python_cypher.CypherToNetworkx()
        test_parser.query(g, query)
项目:python_cypher    作者:zacernst    | 项目源码 | 文件源码
def test_create_node(self):
        """Test we can build a query and create a node"""
        g = nx.MultiDiGraph()
        query = 'CREATE (n) RETURN n'
        test_parser = python_cypher.CypherToNetworkx()
        for i in test_parser.query(g, query):
            pass
        self.assertEqual(len(g.node), 1)