Source code for libadalina_core.graph_extraction.builders.graph_builder

import networkx as nx
import shapely
from pyspark.sql import Column
import pyspark.sql as ps
from sedona.sql import ST_Points, ST_Dump, ST_LineSegments, ST_Intersects, ST_Length, ST_LengthSpheroid

from libadalina_core.sedona_utils import DataFrame
from libadalina_core.spatial_operators import spatial_join, spatial_aggregation, AggregationFunction, JoinType, polygonize

from libadalina_core.graph_extraction.readers import MandatoryColumns, OneWay
from pyspark.sql import functions as F

from libadalina_core.sedona_utils import to_spark_dataframe

def all_columns_except_geometry(df: ps.DataFrame):
    return [F.col(c) for c in df.columns if c != 'geometry']

def get_column_name(col: Column) -> str:
    return col._jc.toString().split('.')[-1]

def explode_multipoint_to_points(df: ps.DataFrame) -> ps.DataFrame:
    return (df
            .select(F.explode(ST_Dump(ST_Points(F.col("geometry")))).alias("geometry"))
            .distinct()).select(F.monotonically_increasing_id().alias('uuid'), F.col('geometry'))

def explode_multiline_to_lines(df: ps.DataFrame) -> ps.DataFrame:
    return (df
            .select(*all_columns_except_geometry(df),
                    F.explode(ST_LineSegments(F.col("geometry"))).alias('geometry'))
            ).withColumn('distance', ST_LengthSpheroid(F.col('geometry')))

def join_lines_with_points(lines_df: ps.DataFrame, points_df: ps.DataFrame) -> ps.DataFrame:
    return (lines_df
            .join(points_df, on=ST_Intersects(lines_df.geometry, points_df.geometry), how='inner')
            .groupby(lines_df.geometry)
                .agg(
                    *(F.first(c).alias(get_column_name(c)) for c in all_columns_except_geometry(lines_df)),
                    F.collect_list(points_df.uuid).alias('points_uuid'),
                    F.collect_list(points_df.geometry).alias('points_geometry'),
                )
            )

def _add_arc(graph: nx.Graph, point1, point2, direction, data: dict):
    if direction == OneWay.Forward.value:
        graph.add_edge(point1, point2, **data)
    elif direction == OneWay.Backward.value:
        graph.add_edge(point2, point1, **data)
    else:
        graph.add_edge(point1, point2, **data)
        graph.add_edge(point2, point1, **data)


def _add_arcs(graph: nx.Graph, df: DataFrame) -> nx.Graph:
    for row in df.collect():
        points = row['points_uuid']
        point_geometry = row['points_geometry']

        if len(points) != 2:
            raise Exception(f"invalid number of points in line: {points}")
        if len(point_geometry) != 2:
            raise Exception(f"invalid number of points in line: {points}")

        direction = row[MandatoryColumns.oneway.value]
        if len(points) != 2:
            raise Exception(f"invalid number of points in line: {points}")

        point1, point2 = points if point_geometry[0].coords[0] == row['geometry'].coords[0] else points[::-1]
        _add_arc(graph, point1, point2, direction, {
            k: v for k, v in row.asDict().items() if k not in ['points_uuid', 'points_geometry', MandatoryColumns.oneway.value]
        })

    return graph

def reduce_graph(graph: nx.Graph) -> nx.Graph:
    while True:
        nodes_to_reduce = (n for n in graph.nodes() if graph.degree(n) == 2)

        for node in nodes_to_reduce:
            neighbors = list(graph.neighbors(node))
            edges = graph.edges(node, data=True)

            if len(neighbors) != 2 or len(edges) != 2:
                print("Riduzione:", node, graph.degree(node), edges, neighbors)
                continue

            n1, n2 = neighbors
            if n1 == n2 or graph.has_edge(n1, n2) or graph.has_edge(n2, n1):
                # skip to avoid multiple edges between same pair of nodes
                # which would require aggregation of edge attributes
               continue

            edge1, edge2 = edges

            edge1_data = graph.edges[node, n1]
            edge2_data = graph.edges[node, n2]

            merged_line = shapely.line_merge(shapely.MultiLineString([edge1_data['geometry'], edge2_data['geometry']]))
            coords = list(merged_line.coords)
            merged_data = {
                'geometry': shapely.LineString([shapely.geometry.Point(coords[0]), shapely.geometry.Point(coords[-1])]),
            }

            graph.remove_node(node)
            graph.add_edge(n1, n2, **merged_data)


    return reduced

def remove_nodes_without_edges(graph: nx.Graph) -> nx.Graph:
    graph.remove_nodes_from(n for n in graph.nodes() if graph.degree(n) == 0)
    return graph

[docs] def build_graph(roads_df: DataFrame, name: str = 'graph', joined_df: DataFrame | None = None, aggregate_functions: list[AggregationFunction] = None, buffer_radius_meters: float = 100, dataframe_only=False) -> nx.Graph | ps.DataFrame: """ Build a directed graph from a DataFrame of roads. Optionally, enrich the graph with data from another DataFrame using spatial join and aggregation. By default, each edge of the graph is already enriched with distance information. Parameters ---------- roads_df : DataFrame Road network as a DataFrame. Must contain at least columns of `MandatoryColumns` enumeration. name : str The name of the graph. Defaults to 'graph'. joined_df : DataFrame | None DataFrame to join with the roads DataFrame. If None, no join is performed. Defaults to None. aggregate_functions : list[AggregationFunction] List of aggregation functions to apply during the spatial aggregation after the join. If None, no aggregation is performed. Defaults to None. See also `AggregationFunction` and `spatial_aggregation` for more details. buffer_radius_meters : float The radius in meters to buffer the road segments before performing the spatial join. If joined_df is None, this parameter is ignored. Defaults to 100 meters. dataframe_only : bool If True, the function stops before building a networkx Graph and returns a Spark DataFrame instead. Returns ------- nx.Graph | pyspark.sql.DataFrame The resulting directed graph as a networkx DiGraph or a Spark DataFrame if dataframe_only is True. """ roads_df = to_spark_dataframe(roads_df) points_df = explode_multipoint_to_points(roads_df) segments_df = explode_multiline_to_lines(roads_df) del roads_df if joined_df is not None: buffered_df = (polygonize(segments_df, radius_meters=buffer_radius_meters) .withColumnRenamed('geometry', 'road_geometry') .withColumnRenamed('polygonized_geometry', 'geometry') .withColumn('__join_id', F.monotonically_increasing_id()) ) join_result = (spatial_join(buffered_df, joined_df, JoinType.INNER) .withColumnRenamed('geometry_left', 'geometry') ) segments_df = (spatial_aggregation(join_result, group_by_column='__join_id', aggregate_functions=aggregate_functions if aggregate_functions else [] ).drop('__join_id').drop('geometry').drop('geometry_right').withColumnRenamed('road_geometry', 'geometry')) lines_df = join_lines_with_points(segments_df, points_df) del segments_df del points_df if dataframe_only: return lines_df graph = _add_arcs(nx.DiGraph(), lines_df) del lines_df graph = remove_nodes_without_edges(graph) graph.name = name return graph