53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181 | class SpatialAggregation(Component):
"""
This class is responsible for spatial aggregation of the gridded indicators to geographical zones of interest.
"""
COMPONENT_ID = "SpatialAggregation"
def __init__(self, general_config_path: str, component_config_path: str) -> None:
super().__init__(general_config_path, component_config_path)
self.zoning_dataset_id = self.config.get(SpatialAggregation.COMPONENT_ID, "zonning_dataset_id")
hierarchical_levels_raw = self.config.get(SpatialAggregation.COMPONENT_ID, "hierarchical_levels").split(",")
self.hierarchical_levels = []
# check levels are numeric, otherwise raise error
for level in hierarchical_levels_raw:
try:
self.hierarchical_levels.append(int(level))
except ValueError:
raise ValueError(f"Invalid hierarchical level: {level}")
self.current_level = None
def initalize_data_objects(self):
# inputs
self.clear_destination_directory = self.config.getboolean(
SpatialAggregation.COMPONENT_ID, "clear_destination_directory"
)
self.aggregation_type = self.config.get(SpatialAggregation.COMPONENT_ID, "aggregation_type")
if self.aggregation_type not in CLASS_MAPPING.keys():
raise ValueError(f"Invalid aggregation type: {self.aggregation_type}")
# prepare input data objects to aggregate
input_aggregation_do_params = CLASS_MAPPING[self.aggregation_type]["input"]
self.input_aggregation_do = self.import_class(input_aggregation_do_params[0], input_aggregation_do_params[1])
inputs = {
input_aggregation_do_params[2]: self.input_aggregation_do,
"geozones_grid_map_data_silver": SilverGeozonesGridMapDataObject,
}
self.input_data_objects = {}
for key, value in inputs.items():
path = self.config.get(CONFIG_SILVER_PATHS_KEY, key)
if check_if_data_path_exists(self.spark, path):
self.input_data_objects[value.ID] = value(self.spark, path)
else:
self.logger.warning(f"Expected path {path} to exist but it does not")
raise ValueError(f"Invalid path for {value.ID}: {path}")
# prepare output data objects
output_do_params = CLASS_MAPPING[self.aggregation_type]["output"]
self.output_aggregation_do = self.import_class(output_do_params[0], output_do_params[1])
output_do_path = self.config.get(CONFIG_SILVER_PATHS_KEY, output_do_params[2])
if self.clear_destination_directory:
delete_file_or_folder(self.spark, output_do_path)
self.output_data_objects = {}
self.output_data_objects[self.output_aggregation_do.ID] = self.output_aggregation_do(self.spark, output_do_path)
@staticmethod
def import_class(class_path: str, class_name: str):
module = importlib.import_module(class_path)
return getattr(module, class_name)
@get_execution_stats
def execute(self):
self.logger.info(f"Starting {self.COMPONENT_ID}...")
# iterate over each hierarchichal level of zoning dataset
for level in self.hierarchical_levels:
self.logger.info(f"Starting aggregation for level {level} ...")
self.current_level = level
self.read()
self.transform()
self.write()
self.logger.info(f"Finished {self.COMPONENT_ID}")
def transform(self):
self.logger.info(f"Transform method {self.COMPONENT_ID}...")
current_zoning_sdf = self.input_data_objects[SilverGeozonesGridMapDataObject.ID].df
current_zoning_sdf = current_zoning_sdf.filter(
current_zoning_sdf[ColNames.dataset_id].isin(self.zoning_dataset_id)
).select(ColNames.grid_id, ColNames.hierarchical_id, ColNames.zone_id, ColNames.dataset_id)
current_input_sdf = self.input_data_objects[self.input_aggregation_do.ID].df
# do aggregation
aggregated_sdf = self.aggregate_to_zone(
current_input_sdf, current_zoning_sdf, self.current_level, self.output_aggregation_do
)
aggregated_sdf = utils.apply_schema_casting(aggregated_sdf, self.output_aggregation_do.SCHEMA)
self.output_data_objects[self.output_aggregation_do.ID].df = aggregated_sdf
@staticmethod
def aggregate_to_zone(
sdf_to_aggregate: DataFrame, zone_to_grid_map_sdf: DataFrame, hierarchy_level: int, output_do: DataObject
) -> DataFrame:
"""
This method aggregates the input data to the desired hierarchical zone level
args:
sdf_to_aggregate: DataFrame - input data to aggregate
zone_to_grid_map_sdf: DataFrame - mapping of grid tiles to zones
hierarchy_level: int - desired hierarchical zone level
output_do: DataObject - output data object
returns:
sdf_to_aggregate: DataFrame - aggregated data
"""
# Override zone_id with the desired hierarchical zone level.
zone_to_grid_map_sdf = zone_to_grid_map_sdf.withColumn(
ColNames.zone_id,
F.element_at(F.split(F.col(ColNames.hierarchical_id), pattern="\\|"), hierarchy_level),
)
zone_to_grid_map_sdf = zone_to_grid_map_sdf.withColumn(ColNames.level, F.lit(hierarchy_level))
sdf_to_aggregate = sdf_to_aggregate.join(zone_to_grid_map_sdf, on=ColNames.grid_id)
# potentially different aggregation functions can be used
agg_expressions = [F.sum(F.col(col)).alias(col) for col in output_do.VALUE_COLUMNS]
aggregated_sdf = sdf_to_aggregate.groupBy(*output_do.AGGREGATION_COLUMNS).agg(*agg_expressions)
return aggregated_sdf
|