Source code for libadalina_core.spatial_operators.single_table_operators

from enum import Enum
from dataclasses import dataclass
from sedona.sql import ST_Area, ST_Intersection, ST_Union, ST_Buffer, ST_GeometryType, ST_Dump, ST_Intersects, ST_XMin, \
    ST_XMax, ST_YMax, ST_YMin, ST_GeomFromWKT, ST_Perimeter, ST_AreaSpheroid
from libadalina_core.sedona_utils import to_spark_dataframe, DEFAULT_EPSG, DataFrame
import pyspark.sql.functions as func
import pyspark.sql as ps
from shapely import Polygon

[docs] def area(df: DataFrame) -> ps.DataFrame: """ Calculate the area of the geometries in column 'geometry' of the DataFrame. Parameters ---------- df : pandas.DataFrame or geopandas.GeoDataFrame or pyspark.sql.DataFrame Returns ------- pyspark.sql.DataFrame A Spark DataFrame with an additional column 'area' containing the area of each geometry. """ return to_spark_dataframe(df).withColumn('area', ST_AreaSpheroid(func.col('geometry')))
[docs] def perimeter(df: DataFrame) -> ps.DataFrame: """ Calculate the perimeter of the geometries in column 'geometry' of the DataFrame. Parameters ---------- df : pandas.DataFrame or geopandas.GeoDataFrame or pyspark.sql.DataFrame Returns ------- pyspark.sql.DataFrame A Spark DataFrame with an additional column 'perimeter' containing the perimeter of each geometry. """ return to_spark_dataframe(df).withColumn('perimeter', ST_Perimeter(func.col('geometry'), use_spheroid=True))
[docs] def bounding_box(df: DataFrame) -> Polygon: """ Calculate the bounding box of the geometries in a DataFrame. Parameters ---------- df : DataFrame Either a pandas DataFrame, a GeoPandas GeoDataFrame, or a Spark DataFrame containing geometries. Returns ------- shapely.geometry.Polygon A Polygon representing the bounding box of the geometries in the DataFrame. """ table = to_spark_dataframe(df) bounds = table.select( func.min(ST_XMin(table.geometry)).alias('min_x'), func.min(ST_YMin(table.geometry)).alias('min_y'), func.max(ST_XMax(table.geometry)).alias('max_x'), func.max(ST_YMax(table.geometry)).alias('max_y') ).first() return Polygon([(bounds.min_x, bounds.min_y), (bounds.max_x, bounds.min_y), (bounds.max_x, bounds.max_y), (bounds.min_x, bounds.max_y)])
[docs] def cut_features(df: DataFrame, cut_geometry: Polygon) -> ps.DataFrame: """ Cut the features of a DataFrame that do not intersect with a given geometry. This function will cut the geometries in the DataFrame by the provided cut geometry, returning only the parts of the geometries that intersect with the cut geometry. Parameters ---------- df : DataFrame Either a pandas DataFrame, a GeoPandas GeoDataFrame, or a Spark DataFrame containing geometries. cut_geometry : Polygon The geometry to use for cutting the features. Returns ------- pyspark.sql.DataFrame A Spark DataFrame with only the geometries that intersect with the cut geometry. """ table = to_spark_dataframe(df) table = table.withColumn('cutting_geometry', ST_GeomFromWKT(func.lit(cut_geometry.wkt))) return table.select('*').where( ST_Intersects(table.geometry, func.col('cutting_geometry'))).drop('cutting_geometry')
[docs] def polygonize(df: DataFrame, radius_meters: float) -> ps.DataFrame: """ Transform lines and points into polygons by buffering them with a given radius. Each line (or multi-line) is transformed into a polygon by buffering it on both sides, while points are buffered to create a circular area around them. Geometries are implicitly converted to DEFAULT_EPSG. Parameters ---------- df : DataFrame The input DataFrame containing geometries. radius_meters : float The radius in meters to use for buffering points and lines. Returns ------- pyspark.sql.DataFrame A Spark DataFrame with a new column 'polygonized_geometry' containing the buffered geometries. Examples -------- >>> df = pd.DataFrame({'geometry': ['POINT(1 1)', 'LINESTRING(0 0, 1 1, 2 2)']}) >>> polygonized_df = polygonize(df, radius_meters=10) """ table = to_spark_dataframe(df) return table.select("*", func .when(ST_GeometryType(table.geometry).like('%Point%'), ST_Buffer(func.col('geometry'), radius_meters, func.lit(True))) .when(ST_GeometryType(func.col('geometry')).like('%LineString%'), ST_Union( ST_Buffer(func.col('geometry'), radius_meters, func.lit(True), parameters=func.lit('endcap=flat side=left')), ST_Buffer(func.col('geometry'), radius_meters, func.lit(True), parameters=func.lit('endcap=flat side=right')) )) .otherwise(table.geometry) .alias('polygonized_geometry') )
[docs] def explode_multi_geometry(df: DataFrame) -> ps.DataFrame: """ Explode multi-geometry features into individual geometries in such a way that each element of the multi-geometry is represented as a separate row in the DataFrame. Parameters ---------- df : DataFrame Either a pandas DataFrame, a GeoPandas GeoDataFrame, or a Spark DataFrame containing geometries. Returns ------- pyspark.sql.DataFrame The input DataFrame with an additional column that contains the exploded geometries. """ table = to_spark_dataframe(df) return table.select("*", func .when(df.geometry.isNull(), func.array()) .when(ST_GeometryType(df.geometry).like('%Multi%'), func.explode(ST_Dump(df.geometry))) .otherwise(df.geometry) )
[docs] class AggregationType(Enum): """Functions that can be used to aggregate data in a DataFrame.""" COUNT = 'count' """Count the number of rows in a group.""" SUM = 'sum' """Sum the values in a column for each group.""" AVG = 'avg' """Calculate the average of the values in a column for each group.""" MIN = 'min' """Find the minimum value in a column for each group.""" MAX = 'max' """Find the maximum value in a column for each group.""" def to_spark_func(self): if self == AggregationType.COUNT: return func.count elif self == AggregationType.SUM: return func.sum elif self == AggregationType.AVG: return func.avg elif self == AggregationType.MIN: return func.min elif self == AggregationType.MAX: return func.max return func.count # Default to count if none matched def __str__(self): return self.value def __repr__(self): return self.value
[docs] @dataclass class AggregationFunction: """Dataclass representing an aggregation function to be applied to a column in a DataFrame.""" column: str """The name of the column to aggregate.""" aggregation_type: AggregationType """The function to use for aggregation.""" alias: str | None = None """Optional alias for the aggregated column.""" proportional: str | None = None """Optional column to use for proportional aggregation. If specified, the aggregation will be weighted by the area of the intersection with this column's geometry."""
[docs] def spatial_aggregation(table: DataFrame, aggregate_functions: list[AggregationFunction], group_by_column: str = 'geometry' ) -> ps.DataFrame: """ Perform spatial aggregation on a DataFrame based on specified aggregation functions. Entries are aggregated by either the geometries in the DataFrame or the column given as input, and the specified aggregation functions are applied to the other grouped columns. Parameters ---------- table : DataFrame Either a pandas DataFrame, a GeoPandas GeoDataFrame, or a Spark DataFrame containing geometries. aggregate_functions : list[AggregationFunction] List of aggregation functions to apply to the DataFrame grouped columns. group_by_column : str, optional The name of the column to group by. Default is 'geometry'. Returns ------- pyspark.sql.DataFrame A DataFrame with aggregated results based on the specified aggregation functions. All columns that are not specified in the aggregation functions will be aggregated using the first value found in each group. """ table = to_spark_dataframe(table) columns_to_aggregate = [c.column for c in aggregate_functions] projection_of_not_aggregated_columns = ( func.first(c).alias(c) for c in table.columns if c != group_by_column and c not in columns_to_aggregate ) columns_with_no_proportional_aggregation = [c for c in aggregate_functions if c.proportional is None] columns_with_proportional_aggregation = [c for c in aggregate_functions if c.proportional is not None] projection_of_aggregated_columns = ( agg_func.aggregation_type.to_spark_func()(func.col(agg_func.column)).alias( f"{agg_func.aggregation_type.value}({agg_func.column})" if agg_func.alias is None else agg_func.alias ) for agg_func in columns_with_no_proportional_aggregation if agg_func.column in table.columns ) projection_of_proportional_aggregated_columns = ( agg_func.aggregation_type.to_spark_func()(func.col(agg_func.column) * ST_Area(ST_Intersection(func.col('geometry'), func.col(agg_func.proportional))) / ST_Area(func.col(agg_func.proportional))).alias( f"{agg_func.aggregation_type.value}({agg_func.column})" if agg_func.alias is None else agg_func.alias ) for agg_func in columns_with_proportional_aggregation if agg_func.column in table.columns ) # Group by geometry and aggregate other columns return (table .groupby(group_by_column) .agg( # from the columns for which is not specified an aggregation function, take the first value *projection_of_not_aggregated_columns, # apply the aggregation functions to the other columns *projection_of_aggregated_columns, *projection_of_proportional_aggregated_columns ))