Source code for libadalina_core.sedona_configuration.sedona_configuration

import os
from pathlib import Path
from pyspark.sql import SparkSession
from sedona.spark import SedonaContext
import pandas as pd
from libadalina_core.sedona_configuration.jdk_installer import install_jdk_if_needed

# compatibility with Pandas
pd.DataFrame.iteritems = pd.DataFrame.items

def _add_spark_config(spark: SparkSession.Builder, spark_configs: dict[str, str] | None = None) -> SparkSession.Builder:
    if os.environ.get("SPARK_JARS_DIR", None) is not None:
        jars_path = Path(os.environ.get("SPARK_JARS_DIR"))
        jar_files = list(jars_path.glob("*.jar"))
        jars_string = ",".join(str(jar) for jar in jar_files)
        spark = spark.config("spark.jars", jars_string)
    else:
        spark = (spark.config(
                    "spark.jars.packages",
                    "org.apache.sedona:sedona-spark-3.3_2.12:1.7.1,"
                    "org.datasyslab:geotools-wrapper:1.7.1-28.5",
                )
                .config(
                    "spark.jars.repositories",
                    "https://artifacts.unidata.ucar.edu/repository/unidata-all"
                ))
    if spark_configs is not None:
        for k, v in spark_configs.items():
            spark = spark.config(k, v)
    return spark

def _get_sedona_master_configuration(master_host: str, spark_configs: dict[str, str] | None = None) -> SparkSession:
    spark = SparkSession.builder.appName("Adalina").master(master_host)
    spark = _add_spark_config(spark, spark_configs)
    return SedonaContext.create(spark.getOrCreate())

def _sedona_configuration(spark_configs: dict[str, str] | None = None) -> SparkSession:
    config = SedonaContext.builder().appName("Adalina")
    config = _add_spark_config(config, spark_configs)
    return SedonaContext.create(config.getOrCreate())

_sedona_context: SparkSession | None = None

[docs] def init_sedona_context( spark_master: str | None = None, spark: SparkSession | None = None, spark_configs: dict[str, str] | None = None ): """ Initialize the Sedona context for spatial data processing. This function can either: 1. Create a new Sedona context with a specified Spark master, 2. Use an existing SparkSession, or 3. Create a default Sedona context with the default Spark configuration. If no parameters are provided, it will create a default Sedona context (option 3). If a `JAVA_HOME` environment variable is not set, it will attempt to install a compatible JDK. Parameters ---------- spark_master : str, optional The Spark master URL to connect to. If provided, a new Sedona context will be created with this master. spark : pyspark.sql.SparkSession, optional An existing SparkSession to use. If provided, it will be used to create the Sedona context. Examples -------- Initialize the global Sedona session with a default configuration >>> init_sedona_context() Initialize the global Sedona session referencing to a given Spark master >>> init_sedona_context(spark_master="spark://localhost:7077") Initialize the session using a pre-existing SparkSession >>> spark = SparkSession.builder.getOrCreate() >>> init_sedona_context(spark=spark) """ global _sedona_context install_jdk_if_needed() if spark_master is not None: _sedona_context = _get_sedona_master_configuration(spark_master, spark_configs) elif isinstance(spark, SparkSession): _sedona_context = SedonaContext.create(spark) else: _sedona_context = _sedona_configuration(spark_configs)
[docs] def get_sedona_context() -> SparkSession: """ Get the Sedona context for spatial data processing. This context is the one used for all spatial operations in libadalina. If the Sedona context has not been initialized yet with `init_sedona_context`, the function `init_sedona_context` will be called to initialize it with the default configuration. Returns ------- pyspark.sql.SparkSession The Sedona context as a SparkSession. """ global _sedona_context if _sedona_context is None: init_sedona_context() return _sedona_context