Commit f1fe00cb authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Added TileExposure and ExportHandler classes

parent f61f6d2b
Pipeline #45182 passed with stage
in 2 minutes and 36 seconds
......@@ -14,9 +14,11 @@ geographic_selection: # Selection of the geographic area for which GDE will be
lon_e: 23.713597
lat_s: 37.965450
lat_n: 37.972561
export_OBM_footprints: True # If True, geometries of OBM buildings will be exported
database_gde_tiles: # Database where info on the GDE tiles is stored
host: localhost
dbname: gde_tiles_attica_2022_04_12_0900
port: 5432
username: tester
password: somepass
number_cores: 1 # Number of cores used for parallelisation
......@@ -88,6 +88,21 @@ class Configuration:
"lon_e" (float): East-most longitude.
"lat_s" (float): South-most latitude.
"lat_n" (float): North-most latitude.
self.cost_cases (dict):
Dictionary containing indications on the sort of costs to retrieve. The minimum
number of keys is one. The sort of costs that are available are: structural,
non_structural, contents and total. The keys are the names as they will appear in
the output, the values refer to the intrinsic naming in the model (i.e. the way
values are stored in the database).
self.people_cases (dict):
Dictionary containing indications on the time of the day for which the number of
people in the buildings is to be output. The minimum number of keys is one. The
available times of the day are: day, night, transit and census. The keys are the
names as they will appear in the output, the values refer to the intrinsic naming in
the model (i.e. the way values are stored in the database).
self.export_OBM_footprints (bool):
If True, the geometries of OpenBuildingMap buildings will be retrieved and exported,
if False, they will not.
self.database_gde_tiles (dict):
Dictionary containing the credentials needed to connect to the SQL database in which
information on the GDE tiles is stored. The exact parameters needed depend on the
......@@ -111,6 +126,8 @@ class Configuration:
- "bounding_box"
self.number_quadkeys_to_process (int):
Total number of quadkeys to process (from all keys of self.quadkeys_to_process).
self.number_cores (int):
Number of cores that will be used to run the code.
"""
REQUIRES = [
......@@ -119,7 +136,11 @@ class Configuration:
"exposure_entities_to_run",
"exposure_entities_code",
"geographic_selection",
"cost_cases",
"people_cases",
"export_OBM_footprints",
"database_gde_tiles",
"number_cores",
]
def __init__(self, filepath, force_config_over_hierarchies=False):
......@@ -171,6 +192,20 @@ class Configuration:
)
self.interpret_geographic_selection()
self.cost_cases = ConfigurationMethods.assign_hierarchical_parameters(
config, "cost_cases"
)
self.validate_cost_cases()
self.people_cases = ConfigurationMethods.assign_hierarchical_parameters(
config, "people_cases"
)
self.validate_people_cases()
self.export_OBM_footprints = ConfigurationMethods.assign_boolean_parameter(
config, "export_OBM_footprints"
)
self.database_gde_tiles = ConfigurationMethods.retrieve_database_credentials(
config,
"database_gde_tiles",
......@@ -179,6 +214,10 @@ class Configuration:
force_config_over_hierarchies,
)
self.number_cores = ConfigurationMethods.assign_integer_parameter(
config, "number_cores"
)
self.quadkeys_to_process = None
self.number_quadkeys_to_process = None
......@@ -403,8 +442,15 @@ class Configuration:
"data_unit_tiles",
)
)
quadkeys_to_process[exposure_entity_code] = quadkeys_list
number_quadkeys += len(quadkeys_list)
if len(quadkeys_list) > 0:
quadkeys_to_process[exposure_entity_code] = quadkeys_list
number_quadkeys += len(quadkeys_list)
else:
logger.info(
"No quadkeys found for exposure entity '%s', skipping"
% (exposure_entity_code)
)
if self.geographic_selection["selection_mode"].lower() == "data_unit_id":
quadkeys_to_process = {}
......@@ -418,8 +464,14 @@ class Configuration:
"data_unit_tiles",
)
)
quadkeys_to_process[data_unit_id] = quadkeys_list
number_quadkeys += len(quadkeys_list)
if len(quadkeys_list) > 0:
quadkeys_to_process[data_unit_id] = quadkeys_list
number_quadkeys += len(quadkeys_list)
else:
logger.info(
"No quadkeys found for data unit '%s', skipping" % (data_unit_id)
)
if self.geographic_selection["selection_mode"].lower() == "quadkeys":
# Retrieve quadkeys from the indicated file
......@@ -432,9 +484,16 @@ class Configuration:
quadkeys_list.append(element)
f.close()
quadkeys_list = list(dict.fromkeys(quadkeys_list))
quadkeys_to_process = {"quadkeys_list": quadkeys_list}
number_quadkeys = len(quadkeys_list)
if len(quadkeys_list) > 0:
quadkeys_to_process = {"quadkeys_list": quadkeys_list}
else:
logger.info(
"No quadkeys found in '%s'" % (self.geographic_selection["quadkeys_file"])
)
quadkeys_to_process = {}
if self.geographic_selection["selection_mode"].lower() == "bounding_box":
tiles = list(
mercantile.tiles(
......@@ -445,11 +504,63 @@ class Configuration:
18,
)
)
quadkeys_list = list([mercantile.quadkey(tile) for tile in tiles])
quadkeys_to_process = {"bounding_box": quadkeys_list}
number_quadkeys = len(quadkeys_list)
if len(quadkeys_list) > 0:
quadkeys_to_process = {"bounding_box": quadkeys_list}
else:
logger.info("No quadkeys found in bounding box")
quadkeys_to_process = {}
self.quadkeys_to_process = quadkeys_to_process
self.number_quadkeys_to_process = number_quadkeys
return
def validate_cost_cases(self):
"""
This function guarantees that the cost cases indicated as values of the self.cost_cases
dictionary are only those supported by this software. Currently supported values are:
"structural", "non_structural", "contents" and "total". If any other value is found, the
item is removed from self.cost_cases and a warning is logged.
"""
valid_cost_cases = ["structural", "non_structural", "contents", "total"]
to_delete = []
for cost_case_key in self.cost_cases.keys():
if self.cost_cases[cost_case_key] not in valid_cost_cases:
logger.warning(
"Invalid cost case found in configuration file: "
"cost case '%s':'%s' will be ignored"
% (cost_case_key, self.cost_cases[cost_case_key])
)
to_delete.append(cost_case_key)
for case_to_delete in to_delete:
del self.cost_cases[case_to_delete]
def validate_people_cases(self):
"""
This function guarantees that the people cases indicated as values of the
self.people_cases dictionary are only those supported by this software. Currently
supported values are: "day", "night", "transit" and "census". If any other value is
found, the item is removedfrom self.people_cases and a warning is logged.
"""
valid_people_cases = ["day", "night", "transit", "census", "average"]
to_delete = []
for people_case_key in self.people_cases.keys():
if self.people_cases[people_case_key] not in valid_people_cases:
logger.warning(
"Invalid people case found in configuration file: "
"people case '%s':'%s' will be ignored"
% (people_case_key, self.people_cases[people_case_key])
)
to_delete.append(people_case_key)
for case_to_delete in to_delete:
del self.people_cases[case_to_delete]
This diff is collapsed.
......@@ -18,8 +18,11 @@
import logging
import sys
from multiprocessing import Pool
from functools import partial
from gdeexporter.configuration import Configuration
from gdeexporter.database_queries import DatabaseQueries
from gdeexporter.handler import ExportHandler
# Add a logger printing error, warning, info and debug messages to the screen
logger = logging.getLogger()
......@@ -78,11 +81,25 @@ def main():
logger.info("%s quadkeys will be processed" % (config.number_quadkeys_to_process))
for quadkeys_group in config.quadkeys_to_process.keys():
logger.info(
"Processing of %s quadkeys from quadkey group '%s' has started"
% (len(config.quadkeys_to_process[quadkeys_group]), quadkeys_group)
# Create groups of quadkey groups and occupancies, so as to parallelise
if config.number_quadkeys_to_process > 0:
quadkeys_occupancy_groups = [
(quadkeys_group, occupancy_case)
for quadkeys_group in config.quadkeys_to_process.keys()
for occupancy_case in config.occupancies_to_run
]
p = Pool(processes=config.number_cores)
func = partial(
ExportHandler.process_quadkey_occupancy_group,
config,
aggregated_source_id,
)
summary_values = p.map(func, quadkeys_occupancy_groups)
p.close()
p.join()
print(summary_values)
# Leave the program
logger.info("gde-exporter has finished")
......
#!/usr/bin/env python3
# Copyright (C) 2022:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import logging
from gdeexporter.tileexposure import TileExposure
from gdeexporter.database_queries import DatabaseQueries
logger = logging.getLogger()
class ExportHandler:
"""This class handles the main processing activities of the gde-exporter."""
@staticmethod
def process_quadkey_occupancy_group(config, aggregated_source_id, group_attributes):
"""
This function processes a particular quadkey group and occupancy case, both of which are
passed as arguments under 'group_attributes', to enable parallelisation.
Args:
config (Configuration):
Instance of the gdeexporter.configuration.Configuration class.
aggregated_source_id (int):
ID of the source of the aggregated exposure model for which the processing will
take place.
group_attributes (tuple of (quadkeys_group, occupancy_case)):
Tuple with two elements:
quadkeys_group:
Name of the quadkey group for which the processing will take place. It
needs to be a key of the config.quadkeys_to_process dictionary. The
content of config.quadkeys_to_process[quadkeys_group] is a list of
quadkeys.
occupancy_case (str):
Occupancy case for which the processing will take place.
Returns:
summary_values (dict):
Dictionary summarising the number of buildings processed for the input quadkey
group and occupancy case, with the following keys:
processed_quadkeys (int):
Number of quadkeys processed (includes quadkeys with no buildings).
OBM_buildings (int):
Number of GDE-processed OBM buildings.
aggregated_buildings (float):
Number of aggregated buildings.
remainder_buildings (float):
Number of remainder buildings.
total_buildings (float):
Number of total buildings (remainder plus OBM).
"""
quadkeys_group = group_attributes[0]
occupancy_case = group_attributes[1]
logger.info(
"Processing of %s quadkeys from group '%s' and occupancy case '%s' has started"
% (len(config.quadkeys_to_process[quadkeys_group]), quadkeys_group, occupancy_case)
)
summary_values = {}
summary_values["processed_quadkeys"] = 0
summary_values["OBM_buildings"] = 0
summary_values["aggregated_buildings"] = 0.0
summary_values["remainder_buildings"] = 0.0
summary_values["total_buildings"] = 0.0
for quadkey in config.quadkeys_to_process[quadkeys_group]:
quadtile = TileExposure(quadkey, config.cost_cases, config.people_cases)
if config.geographic_selection["selection_mode"].lower() == "data_unit_id":
data_unit_ids = [quadkeys_group]
else:
data_unit_ids = DatabaseQueries.retrieve_data_unit_ids(
quadkey,
aggregated_source_id,
config.exposure_entities_to_run,
occupancy_case,
config.database_gde_tiles,
"data_unit_tiles",
)
for data_unit_id in data_unit_ids:
# Retrieve building classes associated with this data unit, occupancy case and
# aggregated source ID
building_classes = DatabaseQueries.get_building_classes_of_data_unit(
data_unit_id,
occupancy_case,
aggregated_source_id,
config.database_gde_tiles,
"data_units_buildings",
)
exposure_entity_code = data_unit_id[:3]
# Retrieve cost assumptions
cost_assumptions = DatabaseQueries.get_exposure_entities_costs_assumptions(
config.cost_cases,
exposure_entity_code,
occupancy_case,
aggregated_source_id,
config.database_gde_tiles,
"exposure_entities_costs_assumptions",
)
# Retrieve distribution of people at different times of the day
people_distribution = (
DatabaseQueries.get_exposure_entities_population_time_distribution(
config.people_cases,
exposure_entity_code,
occupancy_case,
aggregated_source_id,
config.database_gde_tiles,
"exposure_entities_population_time_distribution",
)
)
# Retrieve number of aggregated, OBM and remainder buildings in the tile
(
number_aggregated,
number_obm,
number_remainder,
) = DatabaseQueries.get_numbers_buildings_for_data_unit_tile(
quadkey,
aggregated_source_id,
occupancy_case,
data_unit_id,
config.database_gde_tiles,
"data_unit_tiles",
)
if number_aggregated < -1.0 or number_obm < -1.0 or number_remainder < -1.0:
logger.error(
"get_numbers_buildings_for_data_unit_tile could not retrieve number "
"of aggregated, remainder and OBM buildings for quadkey = '%s' and "
"data unit ID = '%s'" % (quadkey, data_unit_id)
)
# Append aggregated buildings to quadtile.aggregated_buildings
if number_aggregated > 1e-6: # If smaller, consider equal to zero
quadtile.append_lumped_buildings(
"aggregated_buildings",
building_classes,
number_aggregated,
cost_assumptions,
people_distribution,
data_unit_id,
)
# Append remainder buildings to quadtile.remainder_buildings
if number_remainder > 1e-6: # If smaller, consider equal to zero
quadtile.append_lumped_buildings(
"remainder_buildings",
building_classes,
number_remainder,
cost_assumptions,
people_distribution,
data_unit_id,
)
# Retrieve OBM buildings
obm_buildings, obm_geometries = DatabaseQueries.get_GDE_buildings(
quadkey,
data_unit_id,
occupancy_case,
aggregated_source_id,
config.export_OBM_footprints,
config.database_gde_tiles,
"gde_buildings",
)
if obm_buildings.shape[0] > 0:
# Append OBM buildings to quadtile.obm_buildings
quadtile.append_OBM_buildings(
obm_buildings,
building_classes,
cost_assumptions,
people_distribution,
data_unit_id,
)
# Append obm_geometries to quadtile.obm_buildings_geometries (dictionary)
quadtile.obm_buildings_geometries.update(obm_geometries)
# Add to summary values
summary_values["aggregated_buildings"] += (
quadtile.aggregated_buildings["number"].to_numpy().sum()
)
summary_values["remainder_buildings"] += (
quadtile.remainder_buildings["number"].to_numpy().sum()
)
summary_values["total_buildings"] += (
quadtile.total_buildings["number"].to_numpy().sum()
)
summary_values["OBM_buildings"] += quadtile.obm_buildings["number"].to_numpy().sum()
summary_values["processed_quadkeys"] += len(config.quadkeys_to_process[quadkeys_group])
return summary_values
#!/usr/bin/env python3
# Copyright (C) 2022:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import logging
import pandas
from copy import deepcopy
logger = logging.getLogger()
# Empty DataFrame
BUILDINGS = pandas.DataFrame(
{
"building_class_name": pandas.Series(dtype="str"),
"number": pandas.Series(dtype="float"),
"data_unit_id": pandas.Series(dtype="str"),
}
)
class TileExposure:
"""This class represents the exposure of a tile of zoom level 18.
Attributes:
self.quadkey (str):
Quadkey of the zoom level 18 tile.
self.obm_buildings (Pandas DataFrame):
DataFrame with the OBM buildings that belong to the tile, in terms of:
osm_id (int):
OpenStreetMap ID of the building.
building_class_name (str):
Name of the building class as per the GEM Building Taxonomy v3.0.
number (float):
Probability of the building (identified by its OSM ID) belonging to the
building class.
Columns associated with building replacement costs (float):
Names and contents are user-defined. Values correspond to values per
building multiplied by the probability of the building class corresponding
to the particular building (identified by its OSM ID).
Columns associated with the number of people in the building at different times
of the day (float):
Names and contents are user-defined. Values correspond to values per
building multiplied by the probability of the building class corresponding
to the particular building (identified by its OSM ID).
data_unit_id (str):
ID of the data unit the building belongs to.
self.obm_buildings_geometries (dict):
Dictionary in which each key is a unique 'osm_id' from self.obm_buildings, with the
following subkeys:
centroid (str):
Centroid of the OBM building in Well-Known Text format.
footprint (str) (only if instructed to retrieve footprints by the user):
Footprint of the OBM building in Well-Known Text format.
self.remainder_buildings (Pandas DataFrame):
DataFrame with the remainder buildings that belong to the tile, in terms of:
building_class_name (str):
Name of the building class as per the GEM Building Taxonomy v3.0.
number (float):
Number of buildings of this building class.
Columns associated with building replacement costs (float):
Names and contents are user-defined. Values correspond to values per
building multiplied by the number of buildings of the class.
Columns associated with the number of people in the building at different times
of the day (float):
Names and contents are user-defined. Values correspond to values per
building multiplied by the number of buildings of the class.
data_unit_id (str):
ID of the data unit the buildings belong to.
self.aggregated_buildings (Pandas DataFrame):
DataFrame with the remainder buildings that belong to the tile, in terms of the same
fields described for self.remainder_buildings.
self.total_buildings (Pandas DataFrame):
DataFrame with the total buildings that belong to the tile (aggregation of remainder
and OBM buildings), in terms of the same fields described for
self.remainder_buildings.
"""
def __init__(self, quadkey, cost_cases, people_cases):
"""
Args:
quadkey (str):
Quadkey of the zoom level 18 tile.
cost_cases (dict):
Dictionary containing indications on the sort of costs to output.
people_cases (dict):
Dictionary containing indications on the time of the day for which the number of
people in the buildings is to be output.
"""
self.quadkey = quadkey
self.obm_buildings = self._create_empty_building_dataframes(
cost_cases, people_cases, additional_cols={"osm_id": "str"}
)
self.obm_buildings_geometries = {}
self.remainder_buildings = self._create_empty_building_dataframes(
cost_cases, people_cases
)
self.aggregated_buildings = self._create_empty_building_dataframes(
cost_cases, people_cases
)
self.total_buildings = self._create_empty_building_dataframes(cost_cases, people_cases)
def _create_empty_building_dataframes(self, cost_cases, people_cases, additional_cols={}):
"""
Args:
cost_cases (dict):
Dictionary containing indications on the sort of costs to output.
people_cases (dict):
Dictionary containing indications on the time of the day for which the number of
people in the buildings is to be output.
additional_cols (dict):
Dictionary containing names (keys) and data types (values) of any other column
that the output is required to have.
"""
empty_buildings = deepcopy(BUILDINGS)
for cost_case in cost_cases:
empty_buildings[cost_case] = pandas.Series(dtype="float")
for people_case in people_cases:
empty_buildings[people_case] = pandas.Series(dtype="float")
for col in additional_cols:
empty_buildings[col] = pandas.Series(dtype=additional_cols[col])
return empty_buildings
def append_lumped_buildings(
self,
lumped_building_case,
building_classes,
number_buildings,
cost_assumptions,
people_distribution,
data_unit_id,
):
"""
This function appends buildings to the case of lumped buildings indicated by
'lumped_building_case', which can be either "aggregated_buildings" or
"remainder_buildings". The building classes and their proportions are as indicated in
'building_classes' and the total number of aggregated or remainder buildings is
indicated by 'number_buildings'. The dictionaries 'cost_assumptions' and
'people_distribution' indicate the desired disaggregation of replacement costs and
distribution of people at different times of the day. The output costs and number of
people correspond to the total number of buildings.
Args: