Commit da77eb51 authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Added feature to store data-unit tiles to GDE Tiles database

parent 7388bf24
Pipeline #29476 passed with stage
in 3 minutes and 13 seconds
...@@ -4,6 +4,9 @@ data_pathname: path_to_directory_with_model_data ...@@ -4,6 +4,9 @@ data_pathname: path_to_directory_with_model_data
boundaries_pathname: path_to_directory_with_boundary_files boundaries_pathname: path_to_directory_with_boundary_files
occupancies_to_run: residential, commercial # Need to exist for the indicated `exposure format`, industrial not supported occupancies_to_run: residential, commercial # Need to exist for the indicated `exposure format`, industrial not supported
exposure_entities_to_run: all # Either "all", a comma-space-separated list of entity names, or a name of a .txt or .csv file exposure_entities_to_run: all # Either "all", a comma-space-separated list of entity names, or a name of a .txt or .csv file
exposure_entities_code: # Either "ISO3" in this or a nested structure with exposure entities names and 3-character codes
Exposure Entity 1: EE1
Exposure Entity 2: XXX
number_cores: 1 # Number of cores used for parallelisation number_cores: 1 # Number of cores used for parallelisation
database_built_up: # Database where built-up areas per quadtile are stored database_built_up: # Database where built-up areas per quadtile are stored
host: host_name host: host_name
......
...@@ -53,6 +53,12 @@ class AggregatedExposureModel(abc.ABC): ...@@ -53,6 +53,12 @@ class AggregatedExposureModel(abc.ABC):
""" """
def __init__(self, configuration): def __init__(self, configuration):
"""
Args:
configuration (Configuration object):
Instance of the Configuration class.
"""
self.model_name = configuration.model_name self.model_name = configuration.model_name
self.exposure_format = configuration.exposure_format self.exposure_format = configuration.exposure_format
self.occupancy_cases = None self.occupancy_cases = None
...@@ -249,6 +255,11 @@ class ExposureModelESRM20(AggregatedExposureModel): ...@@ -249,6 +255,11 @@ class ExposureModelESRM20(AggregatedExposureModel):
data_pathname (str): data_pathname (str):
Path to the directory that contains the input aggregated exposure model Path to the directory that contains the input aggregated exposure model
data. data.
exposure_entities_code (str or dict):
If "ISO3", the country ISO3 codes will be automatically retrieved and
used as the codes for the exposure entities. If it is a dictionary
instead, its contents will be used to assign the codes to the exposure
entities.
Returns: Returns:
exposure_entities (dictionary of ExposureEntity): exposure_entities (dictionary of ExposureEntity):
...@@ -320,7 +331,9 @@ class ExposureModelESRM20(AggregatedExposureModel): ...@@ -320,7 +331,9 @@ class ExposureModelESRM20(AggregatedExposureModel):
for exposure_entity in read_names: for exposure_entity in read_names:
if exposure_entity not in exposure_entities.keys(): if exposure_entity not in exposure_entities.keys():
exposure_entities[exposure_entity] = ExposureEntity(exposure_entity) exposure_entities[exposure_entity] = ExposureEntity(
exposure_entity, configuration.exposure_entities_code
)
output = self._map_data_units_types(data_units_types_row.loc[exposure_entity]) output = self._map_data_units_types(data_units_types_row.loc[exposure_entity])
(data_units_type, data_units_level, data_units_definition) = output (data_units_type, data_units_level, data_units_definition) = output
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
# along with this program. If not, see http://www.gnu.org/licenses/. # along with this program. If not, see http://www.gnu.org/licenses/.
import os import os
import sys
import logging import logging
import yaml import yaml
from dotenv import load_dotenv from dotenv import load_dotenv
...@@ -42,6 +43,16 @@ class Configuration: ...@@ -42,6 +43,16 @@ class Configuration:
data will be retrieved. data will be retrieved.
self.exposure_entities_to_run (list of str): self.exposure_entities_to_run (list of str):
List of names of the exposure entities for which the data units will be retrieved. List of names of the exposure entities for which the data units will be retrieved.
self.exposure_entities_code (str or dict):
If "ISO3" (str), the country ISO3 codes associated with the names of the exposure
entities will be automatically retrieved and used as their codes. Otherwise it needs
to be a dicionary whose keys are the names of the exposure entities. The content
within each key is a 3-character string to be used as the code for the corresponding
exposure entity. E.g.:
self.exposure_entities_code = {
"Exposure Entity 1": "EE1",
"Exposure Entity 2": "XXX"
}
self.number_cores (int): self.number_cores (int):
Number of cores that will be used to run the code. Number of cores that will be used to run the code.
self.database_built_up (dict): self.database_built_up (dict):
...@@ -81,6 +92,7 @@ class Configuration: ...@@ -81,6 +92,7 @@ class Configuration:
"boundaries_pathname", "boundaries_pathname",
"occupancies_to_run", "occupancies_to_run",
"exposure_entities_to_run", "exposure_entities_to_run",
"exposure_entities_code",
"number_cores", "number_cores",
"database_built_up", "database_built_up",
"database_gde_tiles", "database_gde_tiles",
...@@ -110,6 +122,24 @@ class Configuration: ...@@ -110,6 +122,24 @@ class Configuration:
self.exposure_entities_to_run = self._assign_listed_parameters( self.exposure_entities_to_run = self._assign_listed_parameters(
config, "exposure_entities_to_run" config, "exposure_entities_to_run"
) )
try:
self.exposure_entities_code = self._validate_exposure_entities_code(config)
except ValueError as e:
error_message = (
"Error: the configuration file assigns unsupported values "
"to exposure_entities_code. The program cannot run. %s" % (e)
)
logger.critical(error_message)
sys.exit(1)
except TypeError as e:
error_message = (
"Error: the configuration file assigns an unsupported data type "
"to exposure_entities_code. The program cannot run. %s" % (e)
)
logger.critical(error_message)
sys.exit(1)
self.number_cores = self._assign_integer_parameter(config, "number_cores") self.number_cores = self._assign_integer_parameter(config, "number_cores")
self.database_built_up = self._retrieve_database_credentials( self.database_built_up = self._retrieve_database_credentials(
config, "database_built_up", "test_db_built_up.env", force_config_over_hierarchies config, "database_built_up", "test_db_built_up.env", force_config_over_hierarchies
...@@ -409,3 +439,54 @@ class Configuration: ...@@ -409,3 +439,54 @@ class Configuration:
} }
return db_config return db_config
def _validate_exposure_entities_code(self, config):
"""This function retrieves the content of config["exposure_entities_code"], and checks
whether it complies with the following conditions:
- It must be either a string or a dictionary.
- If it is one string, it should be equal to "ISO3".
- If it is a dictionary, the elements within each key should be 3-character strings.
An error is raised if these conditions are not met.
Examples of valid values of config["exposure_entities_code"]:
1) "ISO3"
2) {"Exposure Entity 1": "EE1",
"Exposure Entity 2": "XXX"
}
Args:
config (dictionary):
The configuration file read as a dictionary. It may be an empty dictionary.
Returns:
assigned_parameter (str, dictionary or None):
The content of config["exposure_entities_code"], which can be a string or a
dictionary.
"""
assigned_parameter = self._assign_parameter(config, "exposure_entities_code")
if assigned_parameter is None:
return None
if isinstance(assigned_parameter, str):
if assigned_parameter.upper() != "ISO3":
raise ValueError("String must be 'ISO3'.")
elif isinstance(assigned_parameter, dict):
for key in assigned_parameter.keys():
if isinstance(assigned_parameter[key], str):
if len(assigned_parameter[key]) != 3:
raise ValueError(
"The content of each dictionary key must be a 3-character string."
)
else:
raise TypeError("The dictionary must contain only one level of keys.")
else:
raise TypeError("The value must be a string or a dictionary.")
return assigned_parameter
...@@ -44,6 +44,22 @@ class DataUnit: ...@@ -44,6 +44,22 @@ class DataUnit:
""" """
def __init__(self, dataunit_id, geometries_table, target_column_name): def __init__(self, dataunit_id, geometries_table, target_column_name):
"""
Args:
dataunit_id (str):
ID of the DataUnit (e.g. ID of the administrative unit it represents).
geometries_table (GeoPandas GeoDataFrame):
GeoPandas GeoDataFrame containing at least two columns:
target_column_name (str):
Column where the ID of the Data Unit will be sought. The data type of
the elements of this column needs to be 'string', otherwise the function
might fail to find the name of the Data Unit in it.
geometry (Shapely geometry):
Geometry.
target_column_name (str):
Name of the column in which the ID of the Data Unit will be sought.
"""
self.id = dataunit_id self.id = dataunit_id
self.geometry = self.get_data_unit_geometry(geometries_table, target_column_name) self.geometry = self.get_data_unit_geometry(geometries_table, target_column_name)
self.data_unit_tiles = None self.data_unit_tiles = None
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
# along with this program. If not, see http://www.gnu.org/licenses/. # along with this program. If not, see http://www.gnu.org/licenses/.
import logging import logging
import iso3166
from multiprocessing import Pool from multiprocessing import Pool
from functools import partial from functools import partial
from gdeimporter.tools.data_unit_tiles import DataUnitTilesHelper from gdeimporter.tools.data_unit_tiles import DataUnitTilesHelper
...@@ -32,6 +33,9 @@ class ExposureEntity: ...@@ -32,6 +33,9 @@ class ExposureEntity:
Attributes: Attributes:
self.name (str): self.name (str):
Name of the exposure entity. Name of the exposure entity.
self.code (str):
3-character code that uniquely identifies this exposure entity. If the exposure
entity is a country, this is the ISO3 code for the country.
self.occupancy_cases (dict): self.occupancy_cases (dict):
Dictionary definining the type, level and definition of the data units used for each Dictionary definining the type, level and definition of the data units used for each
occupancy case of the model (e.g. residential, commercial), with the following occupancy case of the model (e.g. residential, commercial), with the following
...@@ -77,12 +81,28 @@ class ExposureEntity: ...@@ -77,12 +81,28 @@ class ExposureEntity:
|_ ... |_ ...
""" """
def __init__(self, name): def __init__(self, name, config_code):
"""
Args:
name (str):
Name of the exposure entity.
config_code (str or dict):
Either "ISO3" (str) or a dictionary, of which name needs to be a key.
"""
self.name = name self.name = name
self.code = self._interpret_exposure_entities_code(config_code)
self.occupancy_cases = {} self.occupancy_cases = {}
def create_data_unit_tiles( def create_data_unit_tiles(
self, occupancy_case, number_cores, db_built_up_config, db_table self,
occupancy_case,
number_cores,
db_built_up_config,
db_built_up_table,
db_data_unit_tiles_config,
db_data_unit_tiles_table,
aggregated_source_id,
): ):
"""This function creates the data-unit tiles associated with all data units of the """This function creates the data-unit tiles associated with all data units of the
ExposureEntity for a specified 'occupancy_case'. The latter needs to be a key of ExposureEntity for a specified 'occupancy_case'. The latter needs to be a key of
...@@ -112,7 +132,7 @@ class ExposureEntity: ...@@ -112,7 +132,7 @@ class ExposureEntity:
Password associated with username. Password associated with username.
sourceid (int): sourceid (int):
ID of the built-up area source dataset that will be sought for. ID of the built-up area source dataset that will be sought for.
db_table (str): db_built_up_table (str):
Name of the table of the SQL database where the built-up area values are stored. Name of the table of the SQL database where the built-up area values are stored.
It is assumed that this table contains, at least, the following fields: It is assumed that this table contains, at least, the following fields:
quadkey (str): quadkey (str):
...@@ -121,6 +141,46 @@ class ExposureEntity: ...@@ -121,6 +141,46 @@ class ExposureEntity:
Value of the built-up area to be retrieved. Value of the built-up area to be retrieved.
source_id (int): source_id (int):
ID of the source used to define the built-up area. ID of the source used to define the built-up area.
db_data_unit_tiles_config (dict):
Dictionary containing the credentials needed to connect to the SQL database in
which information on the data-unit tiles is stored. The keys of the dictionary
need to be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
db_data_unit_tiles_table (str):
Name of the table of the SQL database where the data-unit tiles are stored. It
is assumed that this table contains, at least, the following fields:
quadkey (str):
String indicating the quadkey of a tile.
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
occupancy_case (enum):
SQL enumerated type describing the building occupancy cases.
exposure_entity (str):
3-character code of the exposure entity.
data_unit_id (str):
ID of the data unit.
size_data_unit_tile_area (float):
Surface area of the data-unit tile, in square metres.
size_data_unit_tile_built_up_area (float):
Built-up area of the data-unit tile, in square metres.
fraction_data_unit_area (float):
Fraction (0.0, 1.0] that the surface area of the data-unit tile
represents with respect to the surface area of the data unit.
fraction_data_unit_built_up_area (float):
Fraction [0.0, 1.0] that the built-up area of the data-unit tile
represents with respect to the built-up area contained in the data unit.
It can be zero.
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
Returns: Returns:
This function writes the 'data_unit_tiles' attribute of the data units of the This function writes the 'data_unit_tiles' attribute of the data units of the
...@@ -139,19 +199,94 @@ class ExposureEntity: ...@@ -139,19 +199,94 @@ class ExposureEntity:
for dataunit in self.occupancy_cases[occupancy_case]["data_units"].values() for dataunit in self.occupancy_cases[occupancy_case]["data_units"].values()
] ]
# Create the data-unit tiles, paralelising by data unit
p = Pool(processes=number_cores) p = Pool(processes=number_cores)
func = partial( func = partial(
DataUnitTilesHelper.define_data_unit_tiles_and_attributes, DataUnitTilesHelper.define_data_unit_tiles_and_attributes,
db_built_up_config, db_built_up_config,
db_table, db_built_up_table,
) )
all_data_unit_tiles = p.map(func, data_units_geoms) all_data_unit_tiles = p.map(func, data_units_geoms)
p.close() p.close()
p.join() p.join()
# Write data-unit tiles to self
for i, data_unit_id in enumerate(data_units_ids): for i, data_unit_id in enumerate(data_units_ids):
self.occupancy_cases[occupancy_case]["data_units"][ self.occupancy_cases[occupancy_case]["data_units"][
data_unit_id data_unit_id
].data_unit_tiles = all_data_unit_tiles[i] ].data_unit_tiles = all_data_unit_tiles[i]
# Write data-unit tiles to database, paralelising by data unit
p = Pool(processes=number_cores)
func = partial(
DataUnitTilesHelper.write_data_unit_tiles_to_database,
db_data_unit_tiles_config,
db_data_unit_tiles_table,
aggregated_source_id,
occupancy_case,
self.code,
)
_ = p.map(func, self.occupancy_cases[occupancy_case]["data_units"].values())
p.close()
p.join()
return return
def _interpret_exposure_entities_code(self, config_code):
"""This function interprets the value of exposure_entities_code given as configuration.
If config_code = "ISO3", the country ISO3 codes associated with self.name will be
returned. Otherwise, self.name will be sought as a key of config_code and the string
therein contained will be returned.
Args:
config_code (str or dict):
Either "ISO3" (str) or a dictionary, of which self.name needs to be a key.
Returns:
exposure_entity_code (str):
3-character string containing the ISO3 code of the country, if the exposure
entity is a country, or a code that the user defines in the configuration file.
"""
if isinstance(config_code, str) and config_code.upper() == "ISO3":
exposure_entity_code = self.retrieve_country_ISO3(self.name)
else:
exposure_entity_code = config_code[self.name]
return exposure_entity_code
@staticmethod
def retrieve_country_ISO3(country_name):
"""This function returns the ISO3 code of the country with country_name, using the
iso3166 Python module. The value of country_name gets adjusted so that it can be found
in the iso3166 module.
Args:
country_name (str):
Name of the country whose ISO3 code will be sought.
Returns:
iso3_code (str):
ISO3 code of country_name. If country_name is not found, iso3_code is None and
a warning is logged.
"""
all_countries = iso3166.countries_by_name
# Adjust country_name to be as defined in iso3166
country_name_clean = (country_name.replace("_", " ")).upper()
if country_name_clean == "UNITED KINGDOM":
country_name_clean = "UNITED KINGDOM OF GREAT BRITAIN AND NORTHERN IRELAND"
if country_name_clean == "MOLDOVA":
country_name_clean = "MOLDOVA, REPUBLIC OF"
if country_name_clean in all_countries.keys():
iso3_code = all_countries[country_name_clean].alpha3
else:
iso3_code = None
logger.warning(
"retrieve_country_ISO3 could not retrieve an ISO3 code for %s" % (country_name)
)
return iso3_code
...@@ -70,6 +70,9 @@ def main(): ...@@ -70,6 +70,9 @@ def main():
config.number_cores, config.number_cores,
config.database_built_up, config.database_built_up,
"obm_built_area_assessments", "obm_built_area_assessments",
config.database_gde_tiles,
"data_unit_tiles",
aem_source_id,
) )
print("Name of the model: %s" % (aem.model_name)) print("Name of the model: %s" % (aem.model_name))
...@@ -79,7 +82,7 @@ def main(): ...@@ -79,7 +82,7 @@ def main():
print("Data retrieved:") print("Data retrieved:")
for exposure_entity in aem.exposure_entities.keys(): for exposure_entity in aem.exposure_entities.keys():
print(" %s:" % exposure_entity) print(" %s (%s):" % (exposure_entity, aem.exposure_entities[exposure_entity].code))
for case in aem.exposure_entities[exposure_entity].occupancy_cases.keys(): for case in aem.exposure_entities[exposure_entity].occupancy_cases.keys():
print(" %s:" % case) print(" %s:" % case)
for attr in ["data_units_type", "data_units_level", "data_units_definition"]: for attr in ["data_units_type", "data_units_level", "data_units_definition"]:
......
...@@ -559,3 +559,164 @@ class DataUnitTilesHelper: ...@@ -559,3 +559,164 @@ class DataUnitTilesHelper:
db_built_up_areas.close_connection() db_built_up_areas.close_connection()
return built_up_areas return built_up_areas
@staticmethod
def write_data_unit_tiles_to_database(
db_data_unit_tiles_config,
db_table,
aggregated_source_id,
occupancy_case,
exposure_entity_code,
data_unit,
):
"""This function writes to the table with name db_table in the database whose
credentials are indicated in db_data_unit_tiles_config. Each entry is a data-unit tile
from data_unit. All data-unit tiles are associated with a specific aggregated_source_id,
occupancy_case and exposure_entity_code.
Args:
db_data_unit_tiles_config (dict):
Dictionary containing the credentials needed to connect to the SQL database in
which information on the data-unit tiles is stored. The keys of the dictionary
need to be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
db_table (str):
Name of the table of the SQL database where the data-unit tiles will be stored.
It is assumed that this table contains, at least, the following fields:
quadkey (str):
String indicating the quadkey of a tile.
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
occupancy_case (enum):
SQL enumerated type describing the building occupancy cases.
exposure_entity (str):
3-character code of the exposure entity.
data_unit_id (str):
ID of the data unit.
size_data_unit_tile_area (float):
Surface area of the data-unit tile, in square metres.
size_data_unit_tile_built_up_area (float):
Built-up area of the data-unit tile, in square metres.
fraction_data_unit_area (float):
Fraction (0.0, 1.0] that the surface area of the data-unit tile
represents with respect to the surface area of the data unit.
fraction_data_unit_built_up_area (float):
Fraction [0.0, 1.0] that the built-up area of the data-unit tile
represents with respect to the built-up area contained in the data unit.
It can be zero.
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
occupancy_case (str):
Name of the occupancy case (e.g. "residential", "commercial", "industrial")
associated with this data_unit.
exposure_entity_code (str):
3-character string containing the ISO3 code of the country, if the exposure
entity is a country, or a code that the user defines in the configuration file.
data_unit (DataUnit):
Instance of Dataunit.
Returns:
This function writes to the table with name db_table in the database whose
credentials are indicated in db_data_unit_tiles_config. Each entry is a data-unit
tile from data_unit.
"""
data_unit_full_id = "%s_%s" % (exposure_entity_code, data_unit.id)
sql_commands = {}
sql_commands["query"] = "SELECT COUNT(*) FROM %s"
sql_commands["query"] += " WHERE (quadkey='%s' AND aggregated_source_id='%s'"
sql_commands["query"] += " AND occupancy_case='%s' AND data_unit_id='%s');"
sql_commands["update"] = "UPDATE %s"
sql_commands["update"] += " SET (size_data_unit_tile_area,"
sql_commands["update"] += " size_data_unit_tile_built_up_area, fraction_data_unit_area,"
sql_commands["update"] += " fraction_data_unit_built_up_area) ="
sql_commands["update"] += " ('%s','%s','%s','%s')"
sql_commands["update"] += " WHERE (quadkey='%s' AND aggregated_source_id='%s'"
sql_commands["update"] += " AND occupancy_case='%s' AND data_unit_id='%s');"
sql_commands["insert"] = "INSERT INTO"
sql_commands["insert"] += " %s(quadkey, aggregated_source_id, occupancy_case,"
sql_commands["insert"] += " exposure_entity, data_unit_id, size_data_unit_tile_area,"
sql_commands["insert"] += " size_data_unit_tile_built_up_area, fraction_data_unit_area,"
sql_commands["insert"] += " fraction_data_unit_built_up_area)"
sql_commands["insert"] += " VALUES('%s','%s','%s','%s','%s','%s','%s','%s','%s');"
db_gde_tiles = Database(**db_data_unit_tiles_config)
db_gde_tiles.create_connection_and_cursor()
for i in range(data_unit.data_unit_tiles.shape[0]):
# Check if an entry already exists for this data-unit tile
db_gde_tiles.cursor.execute(
sql_commands["query"]
% (
db_table,
data_unit.data_unit_tiles["quadkey"].values[i],
str(aggregated_source_id),
occupancy_case,
data_unit_full_id,
)
)
exec_result = db_gde_tiles.cursor.fetchall()
if exec_result[0][0] > 0: # Entry exists --> update
db_gde_tiles.cursor.execute(
sql_commands["update"]
%