Skip to content

spatial_aggregation

This module is responsible for aggregation of the gridded indicators to geographical zones of interest.

SpatialAggregation

Bases: Component

This class is responsible for spatial aggregation of the gridded indicators to geographical zones of interest.

Source code in multimno/components/execution/spatial_aggregation/spatial_aggregation.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class SpatialAggregation(Component):
    """
    This class is responsible for spatial aggregation of the gridded indicators to geographical zones of interest.
    """

    COMPONENT_ID = "SpatialAggregation"

    def __init__(self, general_config_path: str, component_config_path: str) -> None:
        super().__init__(general_config_path, component_config_path)

        self.zoning_dataset_id = self.config.get(SpatialAggregation.COMPONENT_ID, "zonning_dataset_id")
        hierarchical_levels_raw = self.config.get(SpatialAggregation.COMPONENT_ID, "hierarchical_levels").split(",")

        self.hierarchical_levels = []
        # check levels are numeric, otherwise raise error
        for level in hierarchical_levels_raw:
            try:
                self.hierarchical_levels.append(int(level))
            except ValueError:
                raise ValueError(f"Invalid hierarchical level: {level}")

        self.current_level = None

    def initalize_data_objects(self):

        # inputs
        self.clear_destination_directory = self.config.getboolean(
            SpatialAggregation.COMPONENT_ID, "clear_destination_directory"
        )

        self.aggregation_type = self.config.get(SpatialAggregation.COMPONENT_ID, "aggregation_type")

        if self.aggregation_type not in CLASS_MAPPING.keys():
            raise ValueError(f"Invalid aggregation type: {self.aggregation_type}")

        # prepare input data objects to aggregate
        input_aggregation_do_params = CLASS_MAPPING[self.aggregation_type]["input"]
        self.input_aggregation_do = self.import_class(input_aggregation_do_params[0], input_aggregation_do_params[1])
        inputs = {
            input_aggregation_do_params[2]: self.input_aggregation_do,
            "geozones_grid_map_data_silver": SilverGeozonesGridMapDataObject,
        }

        self.input_data_objects = {}
        for key, value in inputs.items():
            path = self.config.get(CONFIG_SILVER_PATHS_KEY, key)
            if check_if_data_path_exists(self.spark, path):
                self.input_data_objects[value.ID] = value(self.spark, path)
            else:
                self.logger.warning(f"Expected path {path} to exist but it does not")
                raise ValueError(f"Invalid path for {value.ID}: {path}")

        # prepare output data objects
        output_do_params = CLASS_MAPPING[self.aggregation_type]["output"]
        self.output_aggregation_do = self.import_class(output_do_params[0], output_do_params[1])
        output_do_path = self.config.get(CONFIG_SILVER_PATHS_KEY, output_do_params[2])

        if self.clear_destination_directory:
            delete_file_or_folder(self.spark, output_do_path)

        self.output_data_objects = {}
        self.output_data_objects[self.output_aggregation_do.ID] = self.output_aggregation_do(self.spark, output_do_path)

    @staticmethod
    def import_class(class_path: str, class_name: str):
        module = importlib.import_module(class_path)
        return getattr(module, class_name)

    @get_execution_stats
    def execute(self):
        self.logger.info(f"Starting {self.COMPONENT_ID}...")
        # iterate over each hierarchichal level of zoning dataset
        for level in self.hierarchical_levels:
            self.logger.info(f"Starting aggregation for level {level} ...")
            self.current_level = level
            self.read()
            self.transform()
            self.write()
        self.logger.info(f"Finished {self.COMPONENT_ID}")

    def transform(self):
        self.logger.info(f"Transform method {self.COMPONENT_ID}...")

        current_zoning_sdf = self.input_data_objects[SilverGeozonesGridMapDataObject.ID].df
        current_zoning_sdf = current_zoning_sdf.filter(
            current_zoning_sdf[ColNames.dataset_id].isin(self.zoning_dataset_id)
        ).select(ColNames.grid_id, ColNames.hierarchical_id, ColNames.zone_id, ColNames.dataset_id)

        current_input_sdf = self.input_data_objects[self.input_aggregation_do.ID].df

        # do aggregation
        aggregated_sdf = self.aggregate_to_zone(
            current_input_sdf, current_zoning_sdf, self.current_level, self.output_aggregation_do
        )

        aggregated_sdf = utils.apply_schema_casting(aggregated_sdf, self.output_aggregation_do.SCHEMA)

        self.output_data_objects[self.output_aggregation_do.ID].df = aggregated_sdf

    @staticmethod
    def aggregate_to_zone(
        sdf_to_aggregate: DataFrame, zone_to_grid_map_sdf: DataFrame, hierarchy_level: int, output_do: DataObject
    ) -> DataFrame:
        """
        This method aggregates the input data to the desired hierarchical zone level

        args:
            sdf_to_aggregate: DataFrame - input data to aggregate
            zone_to_grid_map_sdf: DataFrame - mapping of grid tiles to zones
            hierarchy_level: int - desired hierarchical zone level
            output_do: DataObject - output data object

        returns:
            sdf_to_aggregate: DataFrame - aggregated data
        """
        # Override zone_id with the desired hierarchical zone level.
        zone_to_grid_map_sdf = zone_to_grid_map_sdf.withColumn(
            ColNames.zone_id,
            F.element_at(F.split(F.col(ColNames.hierarchical_id), pattern="\\|"), hierarchy_level),
        )
        zone_to_grid_map_sdf = zone_to_grid_map_sdf.withColumn(ColNames.level, F.lit(hierarchy_level))

        sdf_to_aggregate = sdf_to_aggregate.join(zone_to_grid_map_sdf, on=ColNames.grid_id)

        # potentially different aggregation functions can be used
        agg_expressions = [F.sum(F.col(col)).alias(col) for col in output_do.VALUE_COLUMNS]
        aggregated_sdf = sdf_to_aggregate.groupBy(*output_do.AGGREGATION_COLUMNS).agg(*agg_expressions)

        return aggregated_sdf

aggregate_to_zone(sdf_to_aggregate, zone_to_grid_map_sdf, hierarchy_level, output_do) staticmethod

This method aggregates the input data to the desired hierarchical zone level

Parameters:

Name Type Description Default
sdf_to_aggregate DataFrame

DataFrame - input data to aggregate

required
zone_to_grid_map_sdf DataFrame

DataFrame - mapping of grid tiles to zones

required
hierarchy_level int

int - desired hierarchical zone level

required
output_do DataObject

DataObject - output data object

required

Returns:

Name Type Description
sdf_to_aggregate DataFrame

DataFrame - aggregated data

Source code in multimno/components/execution/spatial_aggregation/spatial_aggregation.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
@staticmethod
def aggregate_to_zone(
    sdf_to_aggregate: DataFrame, zone_to_grid_map_sdf: DataFrame, hierarchy_level: int, output_do: DataObject
) -> DataFrame:
    """
    This method aggregates the input data to the desired hierarchical zone level

    args:
        sdf_to_aggregate: DataFrame - input data to aggregate
        zone_to_grid_map_sdf: DataFrame - mapping of grid tiles to zones
        hierarchy_level: int - desired hierarchical zone level
        output_do: DataObject - output data object

    returns:
        sdf_to_aggregate: DataFrame - aggregated data
    """
    # Override zone_id with the desired hierarchical zone level.
    zone_to_grid_map_sdf = zone_to_grid_map_sdf.withColumn(
        ColNames.zone_id,
        F.element_at(F.split(F.col(ColNames.hierarchical_id), pattern="\\|"), hierarchy_level),
    )
    zone_to_grid_map_sdf = zone_to_grid_map_sdf.withColumn(ColNames.level, F.lit(hierarchy_level))

    sdf_to_aggregate = sdf_to_aggregate.join(zone_to_grid_map_sdf, on=ColNames.grid_id)

    # potentially different aggregation functions can be used
    agg_expressions = [F.sum(F.col(col)).alias(col) for col in output_do.VALUE_COLUMNS]
    aggregated_sdf = sdf_to_aggregate.groupBy(*output_do.AGGREGATION_COLUMNS).agg(*agg_expressions)

    return aggregated_sdf