Commit 3ec46122 authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Changed parallelisation level from data-unit tiles to data units

parent cda75f10
Pipeline #43001 passed with stage
in 2 minutes and 14 seconds
......@@ -793,15 +793,15 @@ class DatabaseQueries:
return data_unit_tiles
@staticmethod
def get_automatic_completeness_of_quadkey(quadkey, db_completeness_config, db_table):
"""This function retrieves the completeness status of a tile with respect to the
representation of buildings in OpenStreetMap (OSM). If a tile is OSM-complete, then all
buildings that exist in reality are represented in OSM. If a tile is OSM-incomplete,
def get_automatic_completeness_of_quadkeys(quadkeys, db_completeness_config, db_table):
"""This function retrieves the completeness status of a series of tiles with respect to
the representation of buildings in OpenStreetMap (OSM). If a tile is OSM-complete, then
all buildings that exist in reality are represented in OSM. If a tile is OSM-incomplete,
then some buildings that exist in reality are not yet represented in OSM.
Args:
quadkey (str):
Quadkey of the tile for which the completeness status will be retrieved.
quadkeys (array of str):
Quadkeys of the tiles for which the completeness status will be retrieved.
db_completeness_config (dict):
Dictionary containing the credentials needed to connect to the SQL database in
which completeness data are stored. The keys of the dictionary need to be:
......@@ -828,8 +828,9 @@ class DatabaseQueries:
ID of the source used to define the built-up area and completeness.
Returns:
completeness (bool):
True if the tile is OSM-complete, False if it is OSM-incomplete.
completeness (array of bool):
True if the tile is OSM-complete, False if it is OSM-incomplete. Each element
corresponds to one element of 'quadkeys'.
"""
sql_query = "SELECT completeness FROM %s WHERE (quadkey='%s' AND source_id=%s);"
......@@ -837,28 +838,31 @@ class DatabaseQueries:
db_completeness = Database(**db_completeness_config)
db_completeness.create_connection_and_cursor()
db_completeness.cursor.execute(
sql_query % (db_table, quadkey, db_completeness_config["sourceid"])
)
exec_result = db_completeness.cursor.fetchall()
completeness_codes = numpy.zeros([len(quadkeys)], dtype=int)
if len(exec_result) == 0:
# If quadkey not found => GHSL built-up area is zero => treat as complete
completeness_code = 1
elif len(exec_result) == 1:
completeness_code = exec_result[0][0]
else: # More than one entries found, this is an error
# This should not happen, as the database should not allow two entries with the
# same primary key
logger.error(
"ERROR in get_tile_automatic_completeness: "
"more than one entry found for quadkey='%s' AND source_id='%s "
% (quadkey, db_completeness_config["sourceid"])
for i, quadkey in enumerate(quadkeys):
db_completeness.cursor.execute(
sql_query % (db_table, quadkey, db_completeness_config["sourceid"])
)
completeness_code = 0
exec_result = db_completeness.cursor.fetchall()
if len(exec_result) == 0:
# If quadkey not found => GHSL built-up area is zero => treat as complete
completeness_codes[i] = 1
elif len(exec_result) == 1:
completeness_codes[i] = exec_result[0][0]
else: # More than one entries found, this is an error
# This should not happen, as the database should not allow two entries with the
# same primary key
logger.error(
"ERROR in get_automatic_completeness_of_quadkeys: "
"more than one entry found for quadkey='%s' AND source_id='%s "
% (quadkey, db_completeness_config["sourceid"])
)
completeness_codes[i] = 0
db_completeness.close_connection()
completeness = bool(completeness_code)
completeness = numpy.array([bool(completeness_codes[i]) for i in range(len(quadkeys))])
return completeness
......@@ -18,10 +18,10 @@
import logging
import sys
from copy import deepcopy
from multiprocessing import Pool
from functools import partial
from gdecore.configuration import Configuration
from gdecore.database_queries import DatabaseQueries
from gdecore.database_storage import DatabaseStorage
from gdecore.processor import GDEProcessor
from gdecore.occupancy_cases import OccupancyCasesESRM20
......@@ -112,112 +112,44 @@ def main():
)
)
for i, data_unit_id in enumerate(data_units_ids):
# Going by data unit so as to minimise intersection operations, need to hold
# excessively large data in RAM and because building classes are associated with
# specific data units
aux_log_string = (
"Data unit '%s' (of exposure entity '%s' and occupancy case '%s')"
% (data_unit_id, exposure_entity_code, occupancy_case)
)
# Retrieve OBM buildings and assign building classes and probabilities to them
# Retrieve OBM buildings
obm_buildings_raw = (
DatabaseQueries.get_OBM_buildings_in_data_unit_by_occupancy_types(
occupancy_cases.mapping[occupancy_case],
data_units_geometries[i],
config.database_obm_buildings,
"obm_buildings",
)
)
logger.info(
"%s: %s OBM building parts retrieved"
% (aux_log_string, str(obm_buildings_raw.shape[0]))
)
if obm_buildings_raw.shape[0] > 0:
# Group parts of the same relations existing in 'obm_buildings_raw'
obm_buildings = GDEProcessor.post_process_obm_relations(obm_buildings_raw)
else:
obm_buildings = deepcopy(obm_buildings_raw)
logger.info(
"%s: %s OBM buildings identified"
% (aux_log_string, str(obm_buildings.shape[0]))
)
del obm_buildings_raw
# Calculate number of OBM buildings per quadkey
obm_buildings_per_quadkey = GDEProcessor.calculate_buildings_per_quadkey(
obm_buildings["quadkey"].to_numpy()
)
# Retrieve building classes of this data unit
data_unit_building_classes = DatabaseQueries.get_building_classes_of_data_unit(
data_unit_id,
occupancy_case,
aggregated_source_id,
config.database_gde_tiles,
"data_units_buildings",
)
logger.info(
"%s: %s building classes identified"
% (aux_log_string, str(data_unit_building_classes.shape[0]))
)
# Assign building classes to OBM buildings
obm_buildings_building_classes = (
GDEProcessor.assign_building_classes_to_obm_buildings(
obm_buildings, data_unit_building_classes, occupancy_case
)
)
logger.info(
"%s: %s OBM buildings with assigned building classes"
% (aux_log_string, str(len(obm_buildings_building_classes.keys())))
)
# Store building classes of OBM buildings
DatabaseStorage.store_OBM_building_classes(
data_unit_id,
occupancy_case,
aggregated_source_id,
obm_buildings_building_classes,
config.database_gde_tiles,
"gde_buildings",
)
# Prepare data unit IDs and geometries to pass on to parallelisation
data_units_and_geometries = [
(data_units_ids[j], data_units_geometries[j])
for j in range(len(data_units_ids))
]
# Process data units in parallel
# Going by data unit so as to minimise intersection operations, the need to hold
# excessively large data in RAM and because building classes are associated with
# specific data units
p = Pool(processes=config.number_cores)
func = partial(
GDEProcessor.process_data_unit,
config,
aggregated_source_id,
exposure_entity_code,
occupancy_case,
occupancy_cases.mapping[occupancy_case],
)
summary_values = p.map(func, data_units_and_geometries)
p.close()
p.join()
# Retrieve data-unit tiles (quadkey, aggregated_buildings) as a Pandas DataFrame
data_unit_tiles = DatabaseQueries.get_data_unit_tiles_of_data_unit_as_DataFrame(
logger.info(
"Summary for exposure entity '%s' and occupancy case '%s' "
"(values: number of OBM building parts retrieved, "
"number of OBM buildings identified, number of building classes identified, "
"number of OBM buildings to which building classes were assigned, "
"number of data-unit tiles retrieved)" % (exposure_entity_code, occupancy_case)
)
for i, data_unit_id in enumerate(data_units_ids):
aux_str_i = "Data unit '%s' of exposure entity '%s' and occupancy case '%s'" % (
data_unit_id,
exposure_entity_code,
occupancy_case,
aggregated_source_id,
config.database_gde_tiles,
"data_unit_tiles",
)
logger.info(
"%s: %s data-unit tiles retrieved"
% (aux_log_string, str(data_unit_tiles.shape[0]))
)
# Calculate remainder buildings in data-unit tiles
data_unit_tiles = GDEProcessor.process_group_data_unit_tiles(
data_unit_tiles,
obm_buildings_per_quadkey,
config.database_completeness,
"obm_built_area_assessments",
config.number_cores,
)
# Store number of OBM and remainder buildings of the data-unit tiles
DatabaseStorage.store_number_OBM_and_remainder_buildings(
data_unit_id,
occupancy_case,
aggregated_source_id,
data_unit_tiles,
config.database_gde_tiles,
"data_unit_tiles",
)
logger.info("%s: %s" % (aux_str_i, ", ".join(summary_values[i])))
# Leave the program
logger.info("gde-core has finished")
......
......@@ -18,13 +18,12 @@
import logging
from copy import deepcopy
from multiprocessing import Pool
from functools import partial
import numpy
import mercantile
import pandas
import pyproj
from gdecore.database_queries import DatabaseQueries
from gdecore.database_storage import DatabaseStorage
logger = logging.getLogger()
......@@ -717,15 +716,14 @@ class GDEProcessor:
def process_group_data_unit_tiles(
data_unit_tiles,
obm_buildings_per_quadkey,
db_completeness_config,
db_table,
number_cores=1,
):
"""This function processes the data-unit tiles contained in 'data_unit_tiles' using as
many cores as indicated by 'number_cores'. The processing consists of retrieving the
OSM-completeness value of each data-unit tile and calculating the number of remainder
buildings as a function of the number of aggregated buildings, OBM buildings and
completeness.
"""This function calculates the number of buildings expected to exist in each data-unit
tile of 'data_unit_tiles' as a function of its numbers of aggregated buildings and OBM
buildings, as well as its completeness status. These are called the "remainder"
buildings. If the tile is complete, the number of remainder buildings is zero. If the
tile is incomplete, the number of remainder buildings is the difference between the
number of aggregated buildings and the number of OBM buildings, with a minimum value of
zero.
Args:
data_unit_tiles (Pandas DataFrame):
......@@ -735,35 +733,11 @@ class GDEProcessor:
aggregated_buildings (float):
Number of buildings in the data-unit tile as per the aggregated exposure
model with ID 'aggregated_source_id'.
complete (bool):
True if the tile is OSM-complete, False if it is OSM-incomplete.
obm_buildings_per_quadkey (Pandas DataFrame):
Pandas DataFrame with number of OBM buildings ("counts" column) per quadkey
(index).
db_completeness_config (dict):
Dictionary containing the credentials needed to connect to the SQL database in
which completeness data are stored. The keys of the dictionary need to be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
sourceid (int):
ID of the automatic completeness source dataset that will be sought for.
db_table (str):
Name of the table of the SQL database where the completeness data are stored. It
is assumed that this table contains, at least, the following fields:
quadkey (str):
String indicating the quadkey of a tile.
completeness (int):
Completeness code: 0 = incomplete, 1 = complete.
source_id (int):
ID of the source used to define the built-up area and completeness.
number_cores (int):
Number of CPU cores to be used to run this function. Default: 1.
Returns:
data_unit_tiles_full (Pandas DataFrame):
......@@ -787,103 +761,184 @@ class GDEProcessor:
data_unit_tiles_full = data_unit_tiles_full.rename(columns={"counts": "obm_buildings"})
data_unit_tiles_full["obm_buildings"] = data_unit_tiles_full["obm_buildings"].fillna(0)
# Prepare 'data_unit_tiles' for parallel processing:
# each tuple contains (quadkey, aggregated_buildings, obm_buildings) of a data-unit tile
data_unit_tiles_list = [
# Calculate aggregated - OBM buildings for all data-unit tiles, irrespective of their
# completeness status
remainder_buildings = numpy.clip(
(
data_unit_tiles_full["quadkey"].to_numpy()[j],
data_unit_tiles_full["aggregated_buildings"].to_numpy()[j],
data_unit_tiles_full["obm_buildings"].to_numpy()[j],
)
for j in range(data_unit_tiles.shape[0])
]
# Process data-unit tiles in parallel
p = Pool(processes=number_cores)
func = partial(
GDEProcessor.process_data_unit_tile,
db_completeness_config,
db_table,
data_unit_tiles_full["aggregated_buildings"].to_numpy()
- data_unit_tiles_full["obm_buildings"].to_numpy()
),
a_min=0.0,
a_max=None,
)
completeness_and_remainder = p.map(func, data_unit_tiles_list)
p.close()
p.join()
# Make remainder buildings equal to zero when data-unit tiles are complete
remainder_buildings[data_unit_tiles_full["complete"].to_numpy()] = 0.0
data_unit_tiles_full["remainder_buildings"] = [
completeness_and_remainder[i][0] for i in range(len(completeness_and_remainder))
]
data_unit_tiles_full["complete"] = [
completeness_and_remainder[i][1] for i in range(len(completeness_and_remainder))
]
data_unit_tiles_full["remainder_buildings"] = remainder_buildings
return data_unit_tiles_full
@staticmethod
def process_data_unit_tile(db_completeness_config, db_table, data_unit_tiles_attributes):
"""This function calculates the number of buildings expected to exist in the data-unit
tile apart from the OBM buildings, i.e. the "remainder" buildings. If the tile is
complete, the number of remainder buildings is zero. If the tile is incomplete, the
number of remainder buildings is the difference between the number of aggregated
buildings and the number of OBM buildings, with a minimum value of zero.
def process_data_unit(
configuration,
aggregated_source_id,
exposure_entity_code,
occupancy_case,
occupancy_types,
data_unit_id_and_geometry,
):
"""
This function processes the OBM buildings and data-unit tiles associated with a data
unit whose ID and geometry are specified in 'data_unit_id_and_geometry', associated in
turn with a specific 'aggregated_source_id', 'exposure_entity_code' and
'occupancy_case'. The specific occupancy types associated with 'occupancy_case' are
defined by 'occupancy_types'.
The processing consists of the following (main) tasks:
- Retrieve parts of OBM buildings associated with the data unit, whose occupancy
types are as indicated in 'occupancy_types', from the OBM buildings table.
- Gather parts of OBM buildings that belong to the same relation ID.
- Assign building classes to the OBM buildings.
- Store building classes of OBM buildings
- Calculate remainder buildings in the data-unit tiles.
- Store number of OBM and remainder buildings of the data-unit tiles.
- Gathering summary values to return for logging purposes.
Args:
db_completeness_config (dict):
Dictionary containing the credentials needed to connect to the SQL database in
which completeness data are stored. The keys of the dictionary need to be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
sourceid (int):
ID of the automatic completeness source dataset that will be sought for.
db_table (str):
Name of the table of the SQL database where the completeness data are stored. It
is assumed that this table contains, at least, the following fields:
quadkey (str):
String indicating the quadkey of a tile.
completeness (int):
Completeness code: 0 = incomplete, 1 = complete.
source_id (int):
ID of the source used to define the built-up area and completeness.
data_unit_tiles_attributes (tuple of (str, float, int)):
Attributes of this data-unit tile. The elements of the tuple are:
quadkey (str):
String indicating the quadkey of a tile.
aggregated_buildings (float):
Number of buildings in the data-unit tile as per an aggregated exposure
model.
obm_buildings (int):
Number of OBM buildings in the data-unit tile.
configuration (Configuration):
Instance of the Configuration class containing configuration parameters.
aggregated_source_id (int):
ID of the source of the aggregated exposure model to be processed.
exposure_entity_code (str):
3-character code of the exposure entity to be processed.
occupancy_case (str):
Name of the occupancy case (e.g. "residential", "commercial", "industrial") to
be processed.
occupancy_types (list of str):
List of names of target occupancy types (e.g. "RES", "RES1", "COM5", etc).
data_unit_id_and_geometry (tuple):
Tuple with two elements (in order):
data_unit_id (str):
ID of the data unit to be processed.
geometry (Shapely Polygon or MultiPolygon):
Geometry of the data unit with ID 'data_unit_id'.
These attributes are passed as a tuple to allow for this method to be used
together with multiprocessing.Pool.map.
Returns:
complete, remainder_buildings (tuple of (float, bool)):
The elements of the tuple are:
remainder_buildings (float):
Number of remainder buildings in the data-unit tile.
complete (bool):
True if the 'quadkey' in 'data_unit_tiles_attributes' is OSM-complete,
False if it is OSM-incomplete.
summary_values (list of str):
List with five strings (in order):
- Number of OBM building parts retrieved.
- Number of OBM buildings identified (after grouping parts of the same
relations).
- Number of building classes identified.
- Number of OBM buildings to which building classes were assigned.
- Number of data-unit tiles retrieved.
"""
# Split contents of data_unit_tiles_attributes
quadkey = data_unit_tiles_attributes[0]
aggregated_buildings = data_unit_tiles_attributes[1]
obm_buildings = data_unit_tiles_attributes[2]
# Split the contents of 'data_unit_id_and_geometry'
data_unit_id = data_unit_id_and_geometry[0]
data_unit_geometry = data_unit_id_and_geometry[1]
# Retrieve completeness value
complete = DatabaseQueries.get_automatic_completeness_of_quadkey(
quadkey, db_completeness_config, db_table
logger.info(
"Processing data unit '%s' of exposure entity '%s' and occupancy case '%s'"
% (data_unit_id, exposure_entity_code, occupancy_case)
)
if complete:
remainder_buildings = 0.0
summary_values = []
# Retrieve OBM buildings and assign building classes and probabilities to them
# Retrieve OBM buildings
obm_buildings_raw = DatabaseQueries.get_OBM_buildings_in_data_unit_by_occupancy_types(
occupancy_types,
data_unit_geometry,
configuration.database_obm_buildings,
"obm_buildings",
)
# Number of OBM building parts retrieved
summary_values.append(str(obm_buildings_raw.shape[0]))
if obm_buildings_raw.shape[0] > 0:
# Group parts of the same relations existing in 'obm_buildings_raw'
obm_buildings = GDEProcessor.post_process_obm_relations(obm_buildings_raw)
else:
remainder_buildings = max(0.0, aggregated_buildings - obm_buildings)
obm_buildings = deepcopy(obm_buildings_raw)
# Number of OBM buildings identified (after grouping parts of the same relations)
summary_values.append(str(obm_buildings.shape[0]))
del obm_buildings_raw
# Calculate number of OBM buildings per quadkey
obm_buildings_per_quadkey = GDEProcessor.calculate_buildings_per_quadkey(
obm_buildings["quadkey"].to_numpy()
)
# Retrieve building classes of this data unit
data_unit_building_classes = DatabaseQueries.get_building_classes_of_data_unit(
data_unit_id,
occupancy_case,
aggregated_source_id,
configuration.database_gde_tiles,
"data_units_buildings",
)
# Number of building classes identified
summary_values.append(str(data_unit_building_classes.shape[0]))
# Assign building classes to OBM buildings
obm_buildings_building_classes = GDEProcessor.assign_building_classes_to_obm_buildings(
obm_buildings, data_unit_building_classes, occupancy_case
)
# Number of OBM buildings to which building classes were assigned
summary_values.append(str(len(obm_buildings_building_classes.keys())))
# Store building classes of OBM buildings
DatabaseStorage.store_OBM_building_classes(
data_unit_id,
occupancy_case,
aggregated_source_id,
obm_buildings_building_classes,
configuration.database_gde_tiles,
"gde_buildings",
)
# Retrieve data-unit tiles (quadkey, aggregated_buildings) as a Pandas DataFrame
data_unit_tiles = DatabaseQueries.get_data_unit_tiles_of_data_unit_as_DataFrame(
data_unit_id,
occupancy_case,
aggregated_source_id,
configuration.database_gde_tiles,
"data_unit_tiles",
)
# Number of data-unit tiles retrieved
summary_values.append(str(data_unit_tiles.shape[0]))
# Retrieve completeness of data-unit tiles
completeness = DatabaseQueries.get_automatic_completeness_of_quadkeys(
data_unit_tiles["quadkey"].to_numpy(),
configuration.database_completeness,
"obm_built_area_assessments",
)
data_unit_tiles["complete"] = completeness
# Calculate remainder buildings in data-unit tiles
data_unit_tiles = GDEProcessor.process_group_data_unit_tiles(
data_unit_tiles,
obm_buildings_per_quadkey,
)
# Store number of OBM and remainder buildings of the data-unit tiles
DatabaseStorage.store_number_OBM_and_remainder_buildings(
data_unit_id,
occupancy_case,
aggregated_source_id,
data_unit_tiles,
configuration.database_gde_tiles,
"data_unit_tiles",
)
return (remainder_buildings, complete)
return summary_values
......@@ -391,7 +391,7 @@ def test_get_data_unit_tiles_of_data_unit_as_DataFrame(test_db):
assert "aggregated_buildings" in returned_data_unit_tiles.columns
def test_get_automatic_completeness_of_quadkey(test_db):
def test_get_automatic_completeness_of_quadkeys(test_db):
# Database connection (the Configuration class will define the credentials based on whether
# the code is running in the CI or locally)
config = Configuration(
......@@ -407,9 +407,9 @@ def test_get_automatic_completeness_of_quadkey(test_db):
expected_completeness = [False, False, True, True]
for i, quadkey in enumerate(quadkeys):
returned_completeness = DatabaseQueries.get_automatic_completeness_of_quadkey(
quadkey, config.database_completeness, "obm_built_area_assessments"
)
returned_completeness = DatabaseQueries.get_automatic_completeness_of_quadkeys(
quadkeys, config.database_completeness, "obm_built_area_assessments"
)
assert returned_completeness == expected_completeness[i]
for i, quadkey in enumerate(quadkeys):
assert returned_completeness[i] == expected_completeness[i]
......@@ -436,43 +436,7 @@ def test_calculate_buildings_per_quadkey():
assert returned_counts_per_quadkey.loc["122010321033023132", "counts"] == 4
def test_process_data_unit_tile(test_db):
# Database connection (the Configuration class will define the credentials based on whether
# the code is running in the CI or locally)
config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_good.yml")
)