Skip to content

overture_data_ingestion

OvertureDataIngestion

Bases: Component

Source code in multimno/components/ingestion/spatial_data_ingestion/overture_data_ingestion.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class OvertureDataIngestion(Component):
    """ """

    COMPONENT_ID = "OvertureDataIngestion"

    def __init__(self, general_config_path: str, component_config_path: str) -> None:
        super().__init__(general_config_path, component_config_path)

        self.extent = self.config.geteval(self.COMPONENT_ID, "extent")
        self.extraction_quadkey_level = self.config.getint(self.COMPONENT_ID, "extraction_quadkey_level")
        self.quadkeys_to_process = self.config.geteval(self.COMPONENT_ID, "quadkeys_to_process")

    def initalize_data_objects(self):

        self.transportation = "transportation"
        self.landuse = "landuse"
        self.landcover = "landcover"
        self.buildings = "buildings"
        self.water = "water"

        overture_url = self.config.get(self.COMPONENT_ID, "overture_url")

        transportation_url = overture_url + "/theme=transportation/type=segment"
        buildings_url = overture_url + "/theme=buildings/type=building"
        landcover_url = overture_url + "/theme=base/type=land"
        landuse_url = overture_url + "/theme=base/type=land_use"
        water_url = overture_url + "/theme=base/type=water"

        self.input_data_objects = {}
        self.input_data_objects[self.transportation] = LandingGeoParquetDataObject(
            self.spark, transportation_url, set_crs=False
        )
        self.input_data_objects[self.buildings] = LandingGeoParquetDataObject(self.spark, buildings_url, set_crs=False)
        self.input_data_objects[self.landcover] = LandingGeoParquetDataObject(self.spark, landcover_url, set_crs=False)
        self.input_data_objects[self.landuse] = LandingGeoParquetDataObject(self.spark, landuse_url, set_crs=False)
        self.input_data_objects[self.water] = LandingGeoParquetDataObject(self.spark, water_url, set_crs=False)

        self.clear_destination_directory = self.config.getboolean(
            OvertureDataIngestion.COMPONENT_ID, "clear_destination_directory"
        )

        transportation_do_path = self.config.get(CONFIG_LANDING_PATHS_KEY, "transportation_data_landing")
        landuse_do_path = self.config.get(CONFIG_LANDING_PATHS_KEY, "landuse_data_landing")
        landcover_do_path = self.config.get(CONFIG_LANDING_PATHS_KEY, "landcover_data_landing")
        buildings_do_path = self.config.get(CONFIG_LANDING_PATHS_KEY, "buildings_data_landing")
        water_do_path = self.config.get(CONFIG_LANDING_PATHS_KEY, "water_data_landing")

        self.output_data_objects = {}

        self.output_data_objects[self.transportation] = LandingGeoParquetDataObject(
            self.spark, transportation_do_path, [ColNames.quadkey]
        )
        self.output_data_objects[self.landuse] = LandingGeoParquetDataObject(
            self.spark, landuse_do_path, [ColNames.quadkey]
        )
        self.output_data_objects[self.landcover] = LandingGeoParquetDataObject(
            self.spark, landcover_do_path, [ColNames.quadkey]
        )
        self.output_data_objects[self.buildings] = LandingGeoParquetDataObject(
            self.spark, buildings_do_path, [ColNames.quadkey]
        )
        self.output_data_objects[self.water] = LandingGeoParquetDataObject(
            self.spark, water_do_path, [ColNames.quadkey]
        )

        if self.clear_destination_directory:
            for do in self.output_data_objects.values():
                self.logger.info(f"Clearing {do.default_path}")
                delete_file_or_folder(self.spark, do.default_path)

    def execute(self):
        self.logger.info(f"Starting {self.COMPONENT_ID}...")
        self.read()

        if len(self.quadkeys_to_process) == 0:
            self.quadkeys_to_process = quadkey_utils.get_quadkeys_for_bbox(self.extent, self.extraction_quadkey_level)
        self.logger.info(f"Extraction will be done in {len(self.quadkeys_to_process)} parts.")

        self.logger.info("quadkeys to process: ")
        self.logger.info(f"{self.quadkeys_to_process}")
        processed = 0
        for quadkey in self.quadkeys_to_process:
            self.logger.info(f"Processing quadkey {quadkey}")
            self.current_extent = quadkey_utils.quadkey_to_extent(quadkey)
            self.current_quadkey = quadkey
            self.transform()
            processed += 1
            self.logger.info(f"Processed {processed} out of {len(self.quadkeys_to_process)} quadkeys")
        self.logger.info(f"Finished {self.COMPONENT_ID}")

    def write(self, data_object_id: str):
        self.logger.info(f"Writing {data_object_id} data object")
        self.output_data_objects[data_object_id].write()
        return None

    def transform(self):
        self.logger.info(f"Transform method {self.COMPONENT_ID}")

        # process transportaion
        transportation_sdf = self.get_raw_overture_data(self.transportation, ["class", "geometry", "subtype"], 2)
        self.output_data_objects[self.transportation].df = transportation_sdf

        self.write(self.transportation)

        # process landuse
        landuse_cols_to_select = ["subtype", "geometry"]

        landuse_sdf = self.get_raw_overture_data(self.landuse, landuse_cols_to_select, 3)
        self.output_data_objects[self.landuse].df = landuse_sdf

        self.write(self.landuse)

        # process landcover

        landcover_sdf = self.get_raw_overture_data(self.landcover, landuse_cols_to_select, 3)
        self.output_data_objects[self.landcover].df = landcover_sdf

        self.write(self.landcover)

        # process water
        water_sdf = self.get_raw_overture_data(self.water, landuse_cols_to_select, 3)
        water_sdf = water_sdf.withColumn("subtype", F.lit("water"))
        self.output_data_objects[self.water].df = water_sdf

        self.write(self.water)

        # process buildings
        buildings_sdf = self.get_raw_overture_data(self.buildings, landuse_cols_to_select, 3)
        self.output_data_objects[self.buildings].df = buildings_sdf

        self.write(self.buildings)

    def get_raw_overture_data(self, data_type: str, cols_to_select, geometry_type: int) -> DataFrame:
        """
        Retrieves and processes Overture Maps raw data for a specific land use type.

        This function filters input data objects based on the specified data type and optional filter types,
        fixes the polygon geometry, and projects the data to a specific CRS.
        If the persist parameter is set to True, the resulting DataFrame is persisted in memory and disk.

        Args:
            data_type (str): The type of land use data to retrieve.
            filter_types (list, optional): A list of subtypes to filter the data by. Each subtype is a string.
                If None, no filtering is performed. Defaults to None.
            persist (bool, optional): Whether to persist the resulting DataFrame in memory and disk. Defaults to True.

        Returns:
            DataFrame: A DataFrame containing the processed land use data.
            The DataFrame includes a subtype column and a geometry column.
        """

        sdf = self.filter_input_data_objects(data_type, cols_to_select)

        sdf = utils.project_to_crs(sdf, 4326, 3035)
        sdf = utils.fix_geometry(sdf, geometry_type)
        sdf = sdf.withColumn(ColNames.quadkey, F.lit(self.current_quadkey))

        return sdf

    def filter_input_data_objects(self, data_type: str, required_columns: List[str]) -> DataFrame:
        """
        Filters and processes input Overture Maps data based on the specified data type and columns.

        This function selects the required columns from the input data objects,
        filters the data to the current processing iteration extent, and cuts the data to the general extent.
        If the data type is "landcover", "landuse", or "transportation", it further filters the data by the specified subtypes.
        If the data type is not "transportation", it filters out invalid polygons.

        Args:
            data_type (str): The type of data to filter and process. "landcover", "landuse", "transportation", "buildings", or "water".
            required_columns (list): A list of column names to select from the data. Each column name is a string.
            category_col (str): The name of the category column to filter by when the data type is "landcover", "landuse", or "transportation".
            subtypes (list, optional): A list of subtypes to filter the data by when the data type is "landcover", "landuse", or "transportation".
                Each subtype is a string. If None, no subtype filtering is performed. Defaults to None.

        Returns:
            DataFrame: A DataFrame containing the filtered and processed data.
        """

        do_sdf = self.input_data_objects[data_type].df
        do_sdf = do_sdf.select(*required_columns)
        do_sdf = utils.cut_geodata_to_extent(do_sdf, self.current_extent, 4326)

        return do_sdf

filter_input_data_objects(data_type, required_columns)

Filters and processes input Overture Maps data based on the specified data type and columns.

This function selects the required columns from the input data objects, filters the data to the current processing iteration extent, and cuts the data to the general extent. If the data type is "landcover", "landuse", or "transportation", it further filters the data by the specified subtypes. If the data type is not "transportation", it filters out invalid polygons.

Parameters:

Name Type Description Default
data_type str

The type of data to filter and process. "landcover", "landuse", "transportation", "buildings", or "water".

required
required_columns list

A list of column names to select from the data. Each column name is a string.

required
category_col str

The name of the category column to filter by when the data type is "landcover", "landuse", or "transportation".

required
subtypes list

A list of subtypes to filter the data by when the data type is "landcover", "landuse", or "transportation". Each subtype is a string. If None, no subtype filtering is performed. Defaults to None.

required

Returns:

Name Type Description
DataFrame DataFrame

A DataFrame containing the filtered and processed data.

Source code in multimno/components/ingestion/spatial_data_ingestion/overture_data_ingestion.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def filter_input_data_objects(self, data_type: str, required_columns: List[str]) -> DataFrame:
    """
    Filters and processes input Overture Maps data based on the specified data type and columns.

    This function selects the required columns from the input data objects,
    filters the data to the current processing iteration extent, and cuts the data to the general extent.
    If the data type is "landcover", "landuse", or "transportation", it further filters the data by the specified subtypes.
    If the data type is not "transportation", it filters out invalid polygons.

    Args:
        data_type (str): The type of data to filter and process. "landcover", "landuse", "transportation", "buildings", or "water".
        required_columns (list): A list of column names to select from the data. Each column name is a string.
        category_col (str): The name of the category column to filter by when the data type is "landcover", "landuse", or "transportation".
        subtypes (list, optional): A list of subtypes to filter the data by when the data type is "landcover", "landuse", or "transportation".
            Each subtype is a string. If None, no subtype filtering is performed. Defaults to None.

    Returns:
        DataFrame: A DataFrame containing the filtered and processed data.
    """

    do_sdf = self.input_data_objects[data_type].df
    do_sdf = do_sdf.select(*required_columns)
    do_sdf = utils.cut_geodata_to_extent(do_sdf, self.current_extent, 4326)

    return do_sdf

get_raw_overture_data(data_type, cols_to_select, geometry_type)

Retrieves and processes Overture Maps raw data for a specific land use type.

This function filters input data objects based on the specified data type and optional filter types, fixes the polygon geometry, and projects the data to a specific CRS. If the persist parameter is set to True, the resulting DataFrame is persisted in memory and disk.

Parameters:

Name Type Description Default
data_type str

The type of land use data to retrieve.

required
filter_types list

A list of subtypes to filter the data by. Each subtype is a string. If None, no filtering is performed. Defaults to None.

required
persist bool

Whether to persist the resulting DataFrame in memory and disk. Defaults to True.

required

Returns:

Name Type Description
DataFrame DataFrame

A DataFrame containing the processed land use data.

DataFrame

The DataFrame includes a subtype column and a geometry column.

Source code in multimno/components/ingestion/spatial_data_ingestion/overture_data_ingestion.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def get_raw_overture_data(self, data_type: str, cols_to_select, geometry_type: int) -> DataFrame:
    """
    Retrieves and processes Overture Maps raw data for a specific land use type.

    This function filters input data objects based on the specified data type and optional filter types,
    fixes the polygon geometry, and projects the data to a specific CRS.
    If the persist parameter is set to True, the resulting DataFrame is persisted in memory and disk.

    Args:
        data_type (str): The type of land use data to retrieve.
        filter_types (list, optional): A list of subtypes to filter the data by. Each subtype is a string.
            If None, no filtering is performed. Defaults to None.
        persist (bool, optional): Whether to persist the resulting DataFrame in memory and disk. Defaults to True.

    Returns:
        DataFrame: A DataFrame containing the processed land use data.
        The DataFrame includes a subtype column and a geometry column.
    """

    sdf = self.filter_input_data_objects(data_type, cols_to_select)

    sdf = utils.project_to_crs(sdf, 4326, 3035)
    sdf = utils.fix_geometry(sdf, geometry_type)
    sdf = sdf.withColumn(ColNames.quadkey, F.lit(self.current_quadkey))

    return sdf