Skip to content

utils

This module contains utility functions for the multimno package.

apply_schema_casting(sdf, schema)

This function takes a DataFrame and a schema, and applies the schema to the DataFrame. It selects the columns in the DataFrame that are in the schema, and casts each column to the type specified in the schema.

Parameters:

Name Type Description Default
sdf DataFrame

The DataFrame to apply the schema to.

required
schema StructType

The schema to apply to the DataFrame.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame that includes the same rows as the input DataFrame,

DataFrame

but with the columns cast to the types specified in the schema.

Source code in multimno/core/utils.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def apply_schema_casting(sdf: DataFrame, schema: StructType) -> DataFrame:
    """
    This function takes a DataFrame and a schema, and applies the schema to the DataFrame.
    It selects the columns in the DataFrame that are in the schema, and casts each column to the type specified in the schema.

    Args:
        sdf (DataFrame): The DataFrame to apply the schema to.
        schema (StructType): The schema to apply to the DataFrame.

    Returns:
        DataFrame: A new DataFrame that includes the same rows as the input DataFrame,
        but with the columns cast to the types specified in the schema.
    """

    sdf = sdf.select(*[F.col(field.name) for field in schema.fields])
    for field in schema.fields:
        sdf = sdf.withColumn(field.name, F.col(field.name).cast(field.dataType))

    return sdf

calc_hashed_user_id(df, user_column=ColNames.user_id)

Calculates SHA2 hash of user id, takes the first 31 bits and converts them to a non-negative 32-bit integer.

Parameters:

Name Type Description Default
df DataFrame

Data of clean synthetic events with a user id column.

required

Returns:

Type Description
DataFrame

pyspark.sql.DataFrame: Dataframe, where user_id column is transformered to a hashed value.

Source code in multimno/core/utils.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def calc_hashed_user_id(df: DataFrame, user_column: str = ColNames.user_id) -> DataFrame:
    """
    Calculates SHA2 hash of user id, takes the first 31 bits and converts them to a non-negative 32-bit integer.

    Args:
        df (pyspark.sql.DataFrame): Data of clean synthetic events with a user id column.

    Returns:
        pyspark.sql.DataFrame: Dataframe, where user_id column is transformered to a hashed value.

    """

    df = df.withColumn(user_column, F.unhex(F.sha2(F.col(user_column).cast("string"), 256)))
    return df

clip_polygons_with_mask_polygons(input_sdf, mask_sdf, cols_to_keep, self_intersection=False, geometry_column='geometry')

Cuts polygons in the input DataFrame with mask polygons from another DataFrame. This function takes two DataFrames: one with input polygons and another with mask polygons. It cuts the input polygons with the mask polygons, and returns a new DataFrame with the resulting polygons. Both dataframes have to have same coordinate system. Args: input_sdf (DataFrame): A DataFrame containing the input polygons. mask_sdf (DataFrame): A DataFrame containing the mask polygons. cols_to_keep (list): A list of column names to keep from the input DataFrame. geometry_column (str, optional): The name of the geometry column in the DataFrames. Defaults to "geometry". Returns: DataFrame: A DataFrame containing the resulting polygons after cutting the input polygons with the mask polygons.

Source code in multimno/core/utils.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def clip_polygons_with_mask_polygons(
    input_sdf: DataFrame,
    mask_sdf: DataFrame,
    cols_to_keep: List[str],
    self_intersection=False,
    geometry_column: str = "geometry",
) -> DataFrame:
    """
    Cuts polygons in the input DataFrame with mask polygons from another DataFrame.
    This function takes two DataFrames: one with input polygons and another with mask polygons.
    It cuts the input polygons with the mask polygons, and returns a new DataFrame with the resulting polygons.
    Both dataframes have to have same coordinate system.
    Args:
        input_sdf (DataFrame): A DataFrame containing the input polygons.
        mask_sdf (DataFrame): A DataFrame containing the mask polygons.
        cols_to_keep (list): A list of column names to keep from the input DataFrame.
        geometry_column (str, optional): The name of the geometry column in the DataFrames.
            Defaults to "geometry".
    Returns:
        DataFrame: A DataFrame containing the resulting polygons after cutting the input polygons with the mask polygons.
    """
    input_sdf = input_sdf.withColumn("id", F.monotonically_increasing_id())
    cols_to_keep = [f"a.{col}" for col in cols_to_keep]
    if self_intersection:
        # Join smaller polygons to larger polygons
        input_sdf = input_sdf.withColumn("area", STF.ST_Area(geometry_column))
        intersection = input_sdf.alias("a").join(
            input_sdf.alias("b"),
            STP.ST_Intersects("a.geometry", "b.geometry") & (F.col("a.area") > F.col("b.area")),
        )
        input_sdf = input_sdf.drop("area")
    else:
        intersection = input_sdf.alias("a").join(
            mask_sdf.alias("b"),
            STP.ST_Intersects("a.geometry", "b.geometry"),
        )
    intersection_cut = (
        intersection.groupby("a.id", *cols_to_keep)
        .agg(F.array_agg(f"b.{geometry_column}").alias("cut_geometry"))
        .withColumn("cut_geometry", STF.ST_Union("cut_geometry"))
    )
    intersection_cut = fix_geometry(intersection_cut, 3, "cut_geometry")
    intersection_cut = intersection_cut.withColumn(
        geometry_column, STF.ST_Difference(f"a.{geometry_column}", "cut_geometry")
    ).drop("cut_geometry")

    non_intersection = input_sdf.join(intersection_cut, ["id"], "left_anti")
    full_sdf = non_intersection.union(intersection_cut).drop("id")

    full_sdf = fix_geometry(full_sdf, 3, geometry_column)

    return full_sdf

cut_geodata_to_extent(sdf, extent, target_crs, geometry_column='geometry')

Cuts geometries in a DataFrame to a specified extent.

Parameters:

Name Type Description Default
sdf DataFrame

The DataFrame to filter. The DataFrame must contain a geometry column.

required
extent tuple

A tuple representing the extent. The tuple contains four elements: (west, south, east, north), which are the western, southern, eastern, and northern bounds of the WGS84 extent.

required
target_crs int

The CRS of DataFrame to transform the extent to.

required
geometry_column str

The name of the geometry column. Defaults to "geometry".

'geometry'

Returns:

Name Type Description
DataFrame DataFrame

A DataFrame containing the same rows as the input DataFrame, but with the geometries cut to the extent.

Source code in multimno/core/utils.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def cut_geodata_to_extent(
    sdf: DataFrame,
    extent: Tuple[float, float, float, float],
    target_crs: int,
    geometry_column: str = "geometry",
) -> DataFrame:
    """
    Cuts geometries in a DataFrame to a specified extent.

    Args:
        sdf (DataFrame): The DataFrame to filter. The DataFrame must contain a geometry column.
        extent (tuple): A tuple representing the extent. The tuple contains four elements:
            (west, south, east, north), which are the western, southern, eastern, and northern bounds of the WGS84 extent.
        target_crs (int): The CRS of DataFrame to transform the extent to.
        geometry_column (str, optional): The name of the geometry column. Defaults to "geometry".

    Returns:
        DataFrame: A DataFrame containing the same rows as the input DataFrame, but with the geometries cut to the extent.
    """

    sdf = filter_geodata_to_extent(sdf, extent, target_crs, geometry_column)

    extent = STC.ST_PolygonFromEnvelope(*extent)
    if target_crs != 4326:
        extent = STF.ST_Transform(extent, F.lit("EPSG:4326"), F.lit(f"EPSG:{target_crs}"))
    sdf = sdf.withColumn(geometry_column, STF.ST_Intersection(F.col(geometry_column), extent))

    return sdf

filter_geodata_to_extent(sdf, extent, target_crs, geometry_column='geometry')

Filters a DataFrame to include only rows with geometries that intersect a specified extent.

Parameters:

Name Type Description Default
sdf DataFrame

The DataFrame to filter. The DataFrame must contain a geometry column.

required
extent tuple

A tuple representing the extent. The tuple contains four elements: (west, south, east, north), which are the western, southern, eastern, and northern bounds of the WGS84 extent.

required
target_crs int

The CRS of DataFrame to transform the extent to.

required
geometry_column str

The name of the geometry column. Defaults to "geometry".

'geometry'

Returns:

Name Type Description
DataFrame DataFrame

A DataFrame containing only the rows from the input DataFrame where the geometry intersects the extent.

Source code in multimno/core/utils.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def filter_geodata_to_extent(
    sdf: DataFrame,
    extent: Tuple[float, float, float, float],
    target_crs: int,
    geometry_column: str = "geometry",
) -> DataFrame:
    """
    Filters a DataFrame to include only rows with geometries that intersect a specified extent.

    Args:
        sdf (DataFrame): The DataFrame to filter. The DataFrame must contain a geometry column.
        extent (tuple): A tuple representing the extent. The tuple contains four elements:
            (west, south, east, north), which are the western, southern, eastern, and northern bounds of the WGS84 extent.
        target_crs (int): The CRS of DataFrame to transform the extent to.
        geometry_column (str, optional): The name of the geometry column. Defaults to "geometry".

    Returns:
        DataFrame: A DataFrame containing only the rows from the input DataFrame where the geometry intersects the extent.
    """

    extent = STC.ST_PolygonFromEnvelope(*extent)
    if target_crs != 4326:
        extent = STF.ST_Transform(extent, F.lit("EPSG:4326"), F.lit(f"EPSG:{target_crs}"))

    sdf = sdf.filter(STP.ST_Intersects(extent, F.col(geometry_column)))

    return sdf

fix_geometry(sdf, geometry_type, geometry_column='geometry')

Fixes the geometry of a given type in a DataFrame. This function applies several operations to the geometries in the specified geometry column of the DataFrame: 1. If a geometry is a collection of geometries, extracts only the geometries of the given type. 2. Filters out any geometries of type other than given. 3. Removes any invalid geometries. 4. Removes any empty geometries. Args: sdf (DataFrame): The DataFrame containing the geometries to check. geometry_column (str, optional): The name of the column containing the geometries. Defaults to "geometry". Returns: DataFrame: The DataFrame with the fixed polygon geometries.

Source code in multimno/core/utils.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def fix_geometry(sdf: DataFrame, geometry_type: int, geometry_column: str = "geometry") -> DataFrame:
    """
    Fixes the geometry of a given type in a DataFrame.
    This function applies several operations to the geometries in the specified geometry column of the DataFrame:
    1. If a geometry is a collection of geometries, extracts only the geometries of the given type.
    2. Filters out any geometries of type other than given.
    3. Removes any invalid geometries.
    4. Removes any empty geometries.
    Args:
        sdf (DataFrame): The DataFrame containing the geometries to check.
        geometry_column (str, optional): The name of the column containing the geometries. Defaults to "geometry".
    Returns:
        DataFrame: The DataFrame with the fixed polygon geometries.
    """
    geometry_name = "Polygon" if geometry_type == 3 else ("Line" if geometry_type == 2 else "Point")
    sdf = (
        sdf.withColumn(
            geometry_column,
            F.when(
                STF.ST_IsCollection(F.col(geometry_column)),
                STF.ST_CollectionExtract(geometry_column, F.lit(geometry_type)),
            ).otherwise(F.col(geometry_column)),
        )
        .filter(~STF.ST_IsEmpty(F.col(geometry_column)))
        .filter(STF.ST_GeometryType(F.col(geometry_column)).like(f"%{geometry_name}%"))
        .filter(STF.ST_IsValid(geometry_column))
        .withColumn(geometry_column, STF.ST_ReducePrecision(F.col(geometry_column), F.lit(5)))
    )
    return sdf

get_epsg_from_geometry_column(df)

Get the EPSG code from the geometry column of a DataFrame.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with a geometry column.

required

Raises:

Type Description
ValueError

If the DataFrame contains multiple EPSG codes.

Returns:

Name Type Description
int int

EPSG code of the geometry column.

Source code in multimno/core/utils.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def get_epsg_from_geometry_column(df: DataFrame) -> int:
    """
    Get the EPSG code from the geometry column of a DataFrame.

    Args:
        df (DataFrame): DataFrame with a geometry column.

    Raises:
        ValueError: If the DataFrame contains multiple EPSG codes.

    Returns:
        int: EPSG code of the geometry column.
    """
    # Get the EPSG code from the geometry column
    temp = df.select(STF.ST_SRID("geometry")).distinct().persist()
    if temp.count() > 1:
        raise ValueError("Dataframe contains multiple EPSG codes")

    epsg = temp.collect()[0][0]
    return epsg

merge_geom_within_mask_geom(input_sdf, mask_sdf, cols_to_keep, geometry_col)

Merges geometries from an input DataFrame that intersect with geometries from a mask DataFrame.

This function performs a spatial join between input and mask DataFrames using ST_Intersects, calculates the geometric intersection between each matching pair of geometries, then groups by specified columns and unions the resulting intersection geometries.

Parameters:

Name Type Description Default
input_sdf DataFrame

Input DataFrame containing geometries to be processed. Must contain a 'geometry' column.

required
mask_sdf DataFrame

Mask DataFrame containing geometries that define the areas of interest. Must contain a 'geometry' column.

required
cols_to_keep List

List of column names from the input DataFrame to preserve in the output. These columns will be used as grouping keys.

required

Returns:

Name Type Description
DataFrame DataFrame

A DataFrame containing merged geometries that result from intersecting the input geometries with the mask geometries.

Source code in multimno/core/utils.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def merge_geom_within_mask_geom(
    input_sdf: DataFrame, mask_sdf: DataFrame, cols_to_keep: List, geometry_col: str
) -> DataFrame:
    """
    Merges geometries from an input DataFrame that intersect with geometries from a mask DataFrame.

    This function performs a spatial join between input and mask DataFrames using ST_Intersects,
    calculates the geometric intersection between each matching pair of geometries,
    then groups by specified columns and unions the resulting intersection geometries.

    Args:
        input_sdf (DataFrame): Input DataFrame containing geometries to be processed.
                              Must contain a 'geometry' column.
        mask_sdf (DataFrame): Mask DataFrame containing geometries that define the areas of interest.
                              Must contain a 'geometry' column.
        cols_to_keep (List): List of column names from the input DataFrame to preserve in the output.
                            These columns will be used as grouping keys.

    Returns:
        DataFrame: A DataFrame containing merged geometries that result from intersecting the input
                  geometries with the mask geometries.
    """

    merge_sdf = (
        input_sdf.alias("a")
        .join(
            mask_sdf.alias("b"),
            STP.ST_Intersects(f"a.{geometry_col}", f"b.{geometry_col}"),
        )
        .withColumn("merge_geometry", STF.ST_Intersection(f"a.{geometry_col}", f"b.{geometry_col}"))
        .groupBy(*cols_to_keep)
        .agg(F.array_agg("merge_geometry").alias(geometry_col))
        .withColumn(geometry_col, F.explode(STF.ST_Dump(STF.ST_Union(geometry_col))))
    )

    return merge_sdf

project_to_crs(sdf, crs_in, crs_out, geometry_column='geometry')

Projects geometry to CRS.

Parameters:

Name Type Description Default
sdf DataFrame

Input DataFrame.

required
crs_in int

Input CRS.

required
crs_out int

Output CRS.

required

Returns:

Name Type Description
DataFrame DataFrame

DataFrame with geometry projected to cartesian CRS.

Source code in multimno/core/utils.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def project_to_crs(sdf: DataFrame, crs_in: int, crs_out: int, geometry_column="geometry") -> DataFrame:
    """
    Projects geometry to CRS.

    Args:
        sdf (DataFrame): Input DataFrame.
        crs_in (int): Input CRS.
        crs_out (int): Output CRS.

    Returns:
        DataFrame: DataFrame with geometry projected to cartesian CRS.
    """
    crs_in = f"EPSG:{crs_in}"
    crs_out = f"EPSG:{crs_out}"

    sdf = sdf.withColumn(
        geometry_column,
        STF.ST_Transform(sdf[geometry_column], F.lit(crs_in), F.lit(crs_out)),
    )
    return sdf

spark_to_geopandas(df, epsg=None)

Convert a Spark DataFrame to a geopandas GeoDataFrame.

Parameters:

Name Type Description Default
df DataFrame

Spark DataFrame to convert.

required

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: GeoDataFrame with the same data as the input DataFrame.

Source code in multimno/core/utils.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def spark_to_geopandas(df: DataFrame, epsg: int = None) -> gpd.GeoDataFrame:
    """
    Convert a Spark DataFrame to a geopandas GeoDataFrame.

    Args:
        df (DataFrame): Spark DataFrame to convert.

    Returns:
        gpd.GeoDataFrame: GeoDataFrame with the same data as the input DataFrame.
    """
    # Convert the DataFrame to a GeoDataFrame
    if epsg is None:
        epsg = get_epsg_from_geometry_column(df)
    gdf = gpd.GeoDataFrame(df.toPandas(), crs=f"EPSG:{epsg}")

    return gdf