Source code for libadalina_core.writers.writers
import pandas as pd
import geopandas as gpd
import pyspark.sql as ps
from libadalina_core.sedona_utils import DEFAULT_EPSG, DataFrame
def dataframe_to_csv(df: DataFrame, path: str, separator: str = ',') -> None:
"""
Write a DataFrame to a CSV file.
The DataFrame geometry is assumed to use the coordinate reference system specified by `libadalina_core.sedona_utils.DEFAULT_EPSG`.
Parameters
----------
df : pandas.DataFrame or geopandas.GeoDataFrame or pyspark.sql.DataFrame
The DataFrame to write, which can be a pandas DataFrame, a GeoPandas GeoDataFrame, or a Spark DataFrame.
path : str
The path to the CSV file where the DataFrame will be saved.
separator : str
The delimiter to use in the CSV file. Default is ','.
"""
if isinstance(df, ps.DataFrame):
df = df.toPandas()
elif isinstance(df, gpd.GeoDataFrame):
df = pd.DataFrame(df)
elif isinstance(df, pd.DataFrame):
pass # already a Pandas DataFrame
else:
raise TypeError(f"Unsupported type {type(df)}. Expected pandas DataFrame, geopandas GeoDataFrame, or spark DataFrame.")
df.to_csv(path, sep=separator, index=False)
[docs]
def dataframe_to_geopackage(df: DataFrame, path: str):
"""
Write a DataFrame to a GeoPackage file.
The DataFrame geometry is assumed to use the coordinate reference system specified by `libadalina_core.sedona_utils.DEFAULT_EPSG`.
Parameters
----------
df : pandas.DataFrame or geopandas.GeoDataFrame or pyspark.sql.DataFrame
The DataFrame to write, which can be a pandas DataFrame, a GeoPandas GeoDataFrame, or a Spark DataFrame.
path : str
The path to the GeoPackage file where the DataFrame will be saved.
Examples
--------
>>> df = pd.DataFrame({'id': [1, 2], 'geometry': ['POINT(1 1)', 'POINT(2 2)']})
>>> dataframe_to_geopackage(df, 'output.gpkg')
"""
if isinstance(df, ps.DataFrame):
df = gpd.GeoDataFrame(df.toPandas(), geometry = 'geometry', crs = DEFAULT_EPSG.value)
elif isinstance(df, pd.DataFrame):
df = gpd.GeoDataFrame(df, geometry='geometry', crs=DEFAULT_EPSG.value)
elif isinstance(df, gpd.GeoDataFrame):
pass # already a GeoDataFrame
else:
raise TypeError(f"Unsupported type {type(df)}. Expected pandas DataFrame, geopandas GeoDataFrame, or spark DataFrame.")
df.to_file(path, layer='dataframe', driver="GPKG")