Commit be0700bc authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Implemented functionalities to write data-unit tiles to database

parent 7388bf24
Pipeline #28925 passed with stage
in 2 minutes and 48 seconds
......@@ -4,6 +4,9 @@ data_pathname: path_to_directory_with_model_data
boundaries_pathname: path_to_directory_with_boundary_files
occupancies_to_run: residential, commercial # Need to exist for the indicated `exposure format`, industrial not supported
exposure_entities_to_run: all # Either "all", a comma-space-separated list of entity names, or a name of a .txt or .csv file
exposure_entities_code: # Either "ISO3" in this or a nested structure with exposure entities names and 3-character codes
Exposure Entity 1: EE1
Exposure Entity 2: XXX
number_cores: 1 # Number of cores used for parallelisation
database_built_up: # Database where built-up areas per quadtile are stored
host: host_name
......
......@@ -249,6 +249,11 @@ class ExposureModelESRM20(AggregatedExposureModel):
data_pathname (str):
Path to the directory that contains the input aggregated exposure model
data.
exposure_entities_code (str or dict):
If "ISO3", the country ISO3 codes will be automatically retrieved and
used as the codes for the exposure entities. If it is a dictionary
instead, its contents will be used to assign the codes to the exposure
entities.
Returns:
exposure_entities (dictionary of ExposureEntity):
......@@ -320,7 +325,9 @@ class ExposureModelESRM20(AggregatedExposureModel):
for exposure_entity in read_names:
if exposure_entity not in exposure_entities.keys():
exposure_entities[exposure_entity] = ExposureEntity(exposure_entity)
exposure_entities[exposure_entity] = ExposureEntity(
exposure_entity, configuration.exposure_entities_code
)
output = self._map_data_units_types(data_units_types_row.loc[exposure_entity])
(data_units_type, data_units_level, data_units_definition) = output
......
......@@ -42,6 +42,16 @@ class Configuration:
data will be retrieved.
self.exposure_entities_to_run (list of str):
List of names of the exposure entities for which the data units will be retrieved.
self.exposure_entities_code (str or dict):
If "ISO3" (str), the country ISO3 codes associated with the names of the exposure
entities will be automatically retrieved and used as their codes. Otherwise it needs
to be a dicionary whose keys are the names of the exposure entities. The content
within each key is a 3-character string to be used as the code for the corresponding
exposure entity. E.g.:
self.exposure_entities_code = {
"Exposure Entity 1": "EE1",
"Exposure Entity 2": "XXX"
}
self.number_cores (int):
Number of cores that will be used to run the code.
self.database_built_up (dict):
......@@ -81,6 +91,7 @@ class Configuration:
"boundaries_pathname",
"occupancies_to_run",
"exposure_entities_to_run",
"exposure_entities_code",
"number_cores",
"database_built_up",
"database_gde_tiles",
......@@ -110,6 +121,7 @@ class Configuration:
self.exposure_entities_to_run = self._assign_listed_parameters(
config, "exposure_entities_to_run"
)
self.exposure_entities_code = self._interpret_exposure_entities_code(config)
self.number_cores = self._assign_integer_parameter(config, "number_cores")
self.database_built_up = self._retrieve_database_credentials(
config, "database_built_up", "test_db_built_up.env", force_config_over_hierarchies
......@@ -409,3 +421,58 @@ class Configuration:
}
return db_config
def _interpret_exposure_entities_code(self, config):
"""This function interprets the content of config["exposure_entities_code"], which can
be either one string (that should be equal to "ISO3") or a dictionary. If a dictionary,
the elements within each key should be 3-character strings. An error is raised if these
conditions are not met.
Examples of valid values of config["exposure_entities_code"]:
1) "ISO3"
2) {"Exposure Entity 1": "EE1",
"Exposure Entity 2": "XXX"
}
Args:
config (dictionary):
The configuration file read as a dictionary. It may be an empty dictionary.
Returns:
assigned_parameter (str, dictionary or None):
The content of config["exposure_entities_code"], which can be a string or a
dictionary.
"""
assigned_parameter = self._assign_parameter(config, "exposure_entities_code")
if assigned_parameter is None:
return None
problem_found = False
if isinstance(assigned_parameter, str):
if assigned_parameter.upper() != "ISO3":
problem_found = True
if not (isinstance(assigned_parameter, str) or isinstance(assigned_parameter, dict)):
problem_found = True
if isinstance(assigned_parameter, dict):
for key in assigned_parameter.keys():
if not isinstance(assigned_parameter[key], str):
problem_found = True
else:
if len(assigned_parameter[key]) != 3:
problem_found = True
if problem_found:
error_message = (
"Error: the configuration file assigns unsupported values "
"to exposure_entities_code. The program cannot run."
)
logger.critical(error_message)
raise OSError(error_message)
return assigned_parameter
......@@ -17,6 +17,7 @@
# along with this program. If not, see http://www.gnu.org/licenses/.
import logging
import iso3166
from multiprocessing import Pool
from functools import partial
from gdeimporter.tools.data_unit_tiles import DataUnitTilesHelper
......@@ -32,6 +33,9 @@ class ExposureEntity:
Attributes:
self.name (str):
Name of the exposure entity.
self.code (str):
3-character code that uniquely identifies this exposure entity. If the exposure
entity is a country, this is the ISO3 code for the country.
self.occupancy_cases (dict):
Dictionary definining the type, level and definition of the data units used for each
occupancy case of the model (e.g. residential, commercial), with the following
......@@ -77,12 +81,20 @@ class ExposureEntity:
|_ ...
"""
def __init__(self, name):
def __init__(self, name, config_code):
self.name = name
self.code = self._interpret_exposure_entities_code(config_code)
self.occupancy_cases = {}
def create_data_unit_tiles(
self, occupancy_case, number_cores, db_built_up_config, db_table
self,
occupancy_case,
number_cores,
db_built_up_config,
db_built_up_table,
db_data_unit_tiles_config,
db_data_unit_tiles_table,
aggregated_source_id,
):
"""This function creates the data-unit tiles associated with all data units of the
ExposureEntity for a specified 'occupancy_case'. The latter needs to be a key of
......@@ -112,7 +124,7 @@ class ExposureEntity:
Password associated with username.
sourceid (int):
ID of the built-up area source dataset that will be sought for.
db_table (str):
db_built_up_table (str):
Name of the table of the SQL database where the built-up area values are stored.
It is assumed that this table contains, at least, the following fields:
quadkey (str):
......@@ -121,6 +133,46 @@ class ExposureEntity:
Value of the built-up area to be retrieved.
source_id (int):
ID of the source used to define the built-up area.
db_data_unit_tiles_config (dict):
Dictionary containing the credentials needed to connect to the SQL database in
which information on the data-unit tiles is stored. The keys of the dictionary
need to be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
db_data_unit_tiles_table (str):
Name of the table of the SQL database where the data-unit tiles are stored. It
is assumed that this table contains, at least, the following fields:
quadkey (str):
String indicating the quadkey of a tile.
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
occupancy_case (enum):
SQL enumerated type describing the building occupancy cases.
exposure_entity (str):
3-character code of the exposure entity.
data_unit_id (str):
ID of the data unit.
size_data_unit_tile_area (float):
Surface area of the data-unit tile, in square metres.
size_data_unit_tile_built_up_area (float):
Built-up area of the data-unit tile, in square metres.
fraction_data_unit_area (float):
Fraction (0.0, 1.0] that the surface area of the data-unit tile
represents with respect to the surface area of the data unit.
fraction_data_unit_built_up_area (float):
Fraction [0.0, 1.0] that the built-up area of the data-unit tile
represents with respect to the built-up area contained in the data unit.
It can be zero.
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
Returns:
This function writes the 'data_unit_tiles' attribute of the data units of the
......@@ -139,19 +191,94 @@ class ExposureEntity:
for dataunit in self.occupancy_cases[occupancy_case]["data_units"].values()
]
# Create the data-unit tiles, paralelising by data unit
p = Pool(processes=number_cores)
func = partial(
DataUnitTilesHelper.define_data_unit_tiles_and_attributes,
db_built_up_config,
db_table,
db_built_up_table,
)
all_data_unit_tiles = p.map(func, data_units_geoms)
p.close()
p.join()
# Write data-unit tiles to self
for i, data_unit_id in enumerate(data_units_ids):
self.occupancy_cases[occupancy_case]["data_units"][
data_unit_id
].data_unit_tiles = all_data_unit_tiles[i]
# Write data-unit tiles to database, paralelising by data unit
p = Pool(processes=number_cores)
func = partial(
DataUnitTilesHelper.write_data_unit_tiles_to_database,
db_data_unit_tiles_config,
db_data_unit_tiles_table,
aggregated_source_id,
occupancy_case,
self.code,
)
_ = p.map(func, self.occupancy_cases[occupancy_case]["data_units"].values())
p.close()
p.join()
return
def _interpret_exposure_entities_code(self, config_code):
"""This function interprets the value of exposure_entities_code given as configuration.
If config_code = "ISO3", the country ISO3 codes associated with self.name will be
returned. Otherwise, self.name will be sought as a key of config_code and the string
therein contained will be returned.
Args:
config_code (str or dict):
Either "ISO3" (str) or a dictionary, of which self.name needs to be a key.
Returns:
exposure_entity_code (str):
3-character string containing the ISO3 code of the country, if the exposure
entity is a country, or a code that the user defines in the configuration file.
"""
if config_code.upper() == "ISO3":
exposure_entity_code = self.retrieve_country_ISO3(self.name)
else:
exposure_entity_code = config_code[self.name]
return exposure_entity_code
@staticmethod
def retrieve_country_ISO3(country_name):
"""This function returns the ISO3 code of the country with country_name, using the
iso3166 Python module. The value of country_name gets adjusted so that it can be found
in the iso3166 module.
Args:
country_name (str):
Name of the country whose ISO3 code will be sought.
Returns:
iso3_code (str):
ISO3 code of country_name. If country_name is not found, iso3_code is None and
a warning is logged.
"""
all_countries = iso3166.countries_by_name
# Adjust country_name to be as defined in iso3166
country_name_clean = (country_name.replace("_", " ")).upper()
if country_name_clean == "UNITED KINGDOM":
country_name_clean = "UNITED KINGDOM OF GREAT BRITAIN AND NORTHERN IRELAND"
if country_name_clean == "MOLDOVA":
country_name_clean = "MOLDOVA, REPUBLIC OF"
if country_name_clean in all_countries.keys():
iso3_code = all_countries[country_name_clean].alpha3
else:
iso3_code = None
logger.warning(
"retrieve_country_ISO3 could not retrieve an ISO3 code for %s" % (country_name)
)
return iso3_code
......@@ -70,6 +70,9 @@ def main():
config.number_cores,
config.database_built_up,
"obm_built_area_assessments",
config.database_gde_tiles,
"data_unit_tiles",
aem_source_id,
)
print("Name of the model: %s" % (aem.model_name))
......@@ -79,7 +82,7 @@ def main():
print("Data retrieved:")
for exposure_entity in aem.exposure_entities.keys():
print(" %s:" % exposure_entity)
print(" %s (%s):" % (exposure_entity, aem.exposure_entities[exposure_entity].code))
for case in aem.exposure_entities[exposure_entity].occupancy_cases.keys():
print(" %s:" % case)
for attr in ["data_units_type", "data_units_level", "data_units_definition"]:
......
......@@ -559,3 +559,162 @@ class DataUnitTilesHelper:
db_built_up_areas.close_connection()
return built_up_areas
@staticmethod
def write_data_unit_tiles_to_database(
db_data_unit_tiles_config,
db_table,
aggregated_source_id,
occupancy_case,
exposure_entity_code,
data_unit,
):
"""This function writes to the table with name db_table in the database whose
credentials are indicated in db_data_unit_tiles_config. Each entry is a data-unit tile
from data_unit. All data-unit tiles are associated with a specific aggregated_source_id,
occupancy_case and exposure_entity_code.
Args:
db_data_unit_tiles_config (dict):
Dictionary containing the credentials needed to connect to the SQL database in
which information on the data-unit tiles is stored. The keys of the dictionary
need to be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
db_table (str):
Name of the table of the SQL database where the data-unit tiles will be stored.
It is assumed that this table contains, at least, the following fields:
quadkey (str):
String indicating the quadkey of a tile.
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
occupancy_case (enum):
SQL enumerated type describing the building occupancy cases.
exposure_entity (str):
3-character code of the exposure entity.
data_unit_id (str):
ID of the data unit.
size_data_unit_tile_area (float):
Surface area of the data-unit tile, in square metres.
size_data_unit_tile_built_up_area (float):
Built-up area of the data-unit tile, in square metres.
fraction_data_unit_area (float):
Fraction (0.0, 1.0] that the surface area of the data-unit tile
represents with respect to the surface area of the data unit.
fraction_data_unit_built_up_area (float):
Fraction [0.0, 1.0] that the built-up area of the data-unit tile
represents with respect to the built-up area contained in the data unit.
It can be zero.
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
occupancy_case (str):
Name of the occupancy case (e.g. "residential", "commercial", "industrial")
associated with this data_unit.
exposure_entity_code (str):
3-character string containing the ISO3 code of the country, if the exposure
entity is a country, or a code that the user defines in the configuration file.
data_unit (DataUnit):
Instance of Dataunit.
Returns:
This function writes to the table with name db_table in the database whose
credentials are indicated in db_data_unit_tiles_config. Each entry is a data-unit
tile from data_unit.
"""
sql_commands = {}
sql_commands["query"] = "SELECT COUNT(*) FROM %s"
sql_commands["query"] += " WHERE (quadkey='%s' AND aggregated_source_id='%s'"
sql_commands["query"] += " AND occupancy_case='%s' AND data_unit_id='%s');"
sql_commands["update"] = "UPDATE %s"
sql_commands["update"] += " SET (size_data_unit_tile_area,"
sql_commands["update"] += " size_data_unit_tile_built_up_area, fraction_data_unit_area,"
sql_commands["update"] += " fraction_data_unit_built_up_area) ="
sql_commands["update"] += " ('%s','%s','%s','%s')"
sql_commands["update"] += " WHERE (quadkey='%s' AND aggregated_source_id='%s'"
sql_commands["update"] += " AND occupancy_case='%s' AND data_unit_id='%s');"
sql_commands["insert"] = "INSERT INTO"
sql_commands["insert"] += " %s(quadkey, aggregated_source_id, occupancy_case,"
sql_commands["insert"] += " exposure_entity, data_unit_id, size_data_unit_tile_area,"
sql_commands["insert"] += " size_data_unit_tile_built_up_area, fraction_data_unit_area,"
sql_commands["insert"] += " fraction_data_unit_built_up_area)"
sql_commands["insert"] += " VALUES('%s','%s','%s','%s','%s','%s','%s','%s','%s');"
db_gde_tiles = Database(**db_data_unit_tiles_config)
db_gde_tiles.create_connection_and_cursor()
for i in range(data_unit.data_unit_tiles.shape[0]):
# Check if an entry already exists for this data-unit tile
db_gde_tiles.cursor.execute(
sql_commands["query"]
% (
db_table,
data_unit.data_unit_tiles["quadkey"].values[i],
str(aggregated_source_id),
occupancy_case,
data_unit.id,
)
)
exec_result = db_gde_tiles.cursor.fetchall()
if exec_result[0][0] > 0: # Entry exists --> update
db_gde_tiles.cursor.execute(
sql_commands["update"]
% (
db_table,
data_unit.data_unit_tiles["size_data_unit_tile_area"].values[i],
data_unit.data_unit_tiles["size_data_unit_tile_built_up_area"].values[
i
],
data_unit.data_unit_tiles["fraction_data_unit_area"].values[i],
data_unit.data_unit_tiles["fraction_data_unit_built_up_area"].values[i],
data_unit.data_unit_tiles["quadkey"].values[i],
str(aggregated_source_id),
occupancy_case,
data_unit.id,
)
)
elif exec_result[0][0] == 0: # No entry for this data-unit tile exists --> append
db_gde_tiles.cursor.execute(
sql_commands["insert"]
% (
db_table,
data_unit.data_unit_tiles["quadkey"].values[i],
str(aggregated_source_id),
occupancy_case,
exposure_entity_code,
data_unit.id,
data_unit.data_unit_tiles["size_data_unit_tile_area"].values[i],
data_unit.data_unit_tiles["size_data_unit_tile_built_up_area"].values[
i
],
data_unit.data_unit_tiles["fraction_data_unit_area"].values[i],
data_unit.data_unit_tiles["fraction_data_unit_built_up_area"].values[i],
)
)
else: # More than one entries found, this is an error
logger.error(
"ERROR IN write_data_unit_tiles_to_database: "
"MORE THAN ONE ENTRY FOUND FOR quadkey='%s' AND aggregated_source_id='%s' "
"AND occupancy_case='%s' AND data_unit_id='%s'"
% (
data_unit.data_unit_tiles["quadkey"].values[i],
str(aggregated_source_id),
occupancy_case,
data_unit.id,
)
)
db_gde_tiles.close_connection()
return
......@@ -40,6 +40,7 @@ setup(
"shapely",
"psycopg2-binary",
"python-dotenv",
"iso3166",
],
extras_require={
"tests": tests_require,
......
......@@ -4,6 +4,7 @@ data_pathname: /some/path/to/directory
boundaries_pathname: /some/path/to/directory
occupancies_to_run: residential, commercial, industrial
exposure_entities_to_run: all
exposure_entities_code: ISO3
number_cores: some
database_built_up:
host: host.somewhere.xx
......
......@@ -4,5 +4,6 @@ data_pathname: /some/path/to/directory
boundaries_pathname: /some/path/to/directory
occupancies_to_run: residential, commercial, industrial
exposure_entities_to_run: all
exposure_entities_code: ISO3
number_cores: 4
database_built_up: some_database_name
......@@ -4,6 +4,7 @@ data_pathname: /some/path/to/directory
boundaries_pathname: /some/path/to/directory
occupancies_to_run: xxx
exposure_entities_to_run: all
exposure_entities_code: ISO3
number_cores: 1
database_built_up:
host: xxx
......
......@@ -4,6 +4,7 @@ data_pathname: /some/path/to/directory
boundaries_pathname: /some/path/to/directory
occupancies_to_run: residential, commercial, industrial
exposure_entities_to_run: all
exposure_entities_code: ISO3
number_cores: 4
database_built_up:
host: host.somewhere.xx
......
......@@ -4,6 +4,7 @@ data_pathname: /some/path/to/directory
boundaries_pathname: /some/path/to/directory
occupancies_to_run: residential
exposure_entities_to_run: Name1
exposure_entities_code: ISO3
number_cores: 4
database_built_up:
host: host.somewhere.xx
......
......@@ -4,6 +4,7 @@ data_pathname: /some/path/to/directory
boundaries_pathname: /some/path/to/directory
occupancies_to_run: residential, commercial, industrial
exposure_entities_to_run: Name1, Name2, Name3
exposure_entities_code: ISO3
number_cores: 4
database_built_up:
host: host.somewhere.xx
......
......@@ -34,6 +34,7 @@ def test_Configuration():
assert returned_config.boundaries_pathname == "/some/path/to/directory"
assert returned_config.occupancies_to_run == ["residential", "commercial", "industrial"]
assert returned_config.exposure_entities_to_run == ["all"]
assert returned_config.exposure_entities_code == "ISO3"
assert returned_config.number_cores == 4
assert len(returned_config.database_built_up.keys()) == 4
assert returned_config.database_built_up["host"] == "host.somewhere.xx"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment