Commit d2f8de46 authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Added feature to read and interpret exposure entities to run

parent b22b3bc8
Pipeline #40591 passed with stage
in 1 minute and 51 seconds
......@@ -5,3 +5,5 @@ database_gde_tiles: # Database where info on the GDE tiles is stored
port: port_number # Leave empty if a port number is not needed
username: username
password: password_of_username
exposure_entities_to_run: all # Either "all", a comma-space-separated list of entity names, or a name of a .txt or .csv file
exposure_entities_code: ISO3 # Either "ISO3" in this or a nested structure with exposure entities names and 3-character codes
......@@ -16,8 +16,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import sys
import logging
from copy import deepcopy
from gdeimporter.tools.configuration_methods import ConfigurationMethods
from gdeimporter.exposureentity import ExposureEntity
from gdecore.database_queries import DatabaseQueries
logger = logging.getLogger()
......@@ -42,11 +46,25 @@ class Configuration:
User name to connect to the SQL database.
password (str):
Password associated with self.username.
self.exposure_entities_to_run (list of str):
List of names of the exposure entities for which the code will be run.
self.exposure_entities_code (str or dict):
If "ISO3" (str), the country ISO3 codes associated with the names of the exposure
entities will be automatically retrieved and used as their codes. Otherwise it needs
to be a dicionary whose keys are the names of the exposure entities. The content
within each key is a 3-character string to be used as the code for the corresponding
exposure entity. E.g.:
self.exposure_entities_code = {
"Exposure Entity 1": "EE1",
"Exposure Entity 2": "XXX"
}
"""
REQUIRES = [
"model_name",
"database_gde_tiles",
"exposure_entities_to_run",
"exposure_entities_code",
]
def __init__(self, filepath, force_config_over_hierarchies=False):
......@@ -70,6 +88,29 @@ class Configuration:
config, "database_gde_tiles", "test_db_gde_tiles.env", force_config_over_hierarchies
)
self.exposure_entities_to_run = ConfigurationMethods.assign_listed_parameters(
config, "exposure_entities_to_run"
)
try:
self.exposure_entities_code = ConfigurationMethods.validate_exposure_entities_code(
config
)
except ValueError as e:
error_message = (
"Error: the configuration file assigns unsupported values "
"to exposure_entities_code. The program cannot run. %s" % (e)
)
logger.critical(error_message)
sys.exit(1)
except TypeError as e:
error_message = (
"Error: the configuration file assigns an unsupported data type "
"to exposure_entities_code. The program cannot run. %s" % (e)
)
logger.critical(error_message)
sys.exit(1)
# Terminate if critical parameters are missing (not all parameters are critical)
for key_parameter in self.REQUIRES:
if getattr(self, key_parameter) is None:
......@@ -79,3 +120,89 @@ class Configuration:
)
logger.critical(error_message)
raise OSError(error_message)
def interpret_exposure_entities_to_run(self, aggregated_source_id=0):
"""This function interprets the value assigned to self.exposure_entities_to_run from the
configuration file and updates self.exposure_entities_to_run accordingly.
Args:
aggregated_source_id (int):
ID of the source of the aggregated exposure model to be run. Only needed if
'exposure_entities_to_run' is "all" in the configuration file.
Returns:
The method updates self.exposure_entities_to_run as a function of its content.
Possibilities:
self.exposure_entities_to_run == ["all"]:
self.exposure_entities_to_run is updated to contain the list of 3-character
codes of all exposure entities associated with 'aggregated_source_id' in the
database self.database_gde_tiles.
self.exposure_entities_to_run contains a list with a path to a .txt or .csv
file:
self.exposure_entities_to_run is updated to contain the list of 3-character
codes of the exposure entities listed in the indicated .txt/.csv file.
self.exposure_entities_to_run contains a list with one or more names of exposure
entities:
self.exposure_entities_to_run is updated to contain the list of 3-character
codes of these names.
Any other case:
self.exposure_entities_to_run becomes an empty list.
"""
if self.exposure_entities_to_run[0].lower() == "all":
# Retrieve 3-char codes of all exposure entities associated with
# aggregated_source_id in the 'data_units' table of self.database_gde_tiles
self.exposure_entities_to_run = (
DatabaseQueries.retrieve_all_exposure_entities_of_aggregated_source_id(
aggregated_source_id, self.database_gde_tiles, "data_units"
)
)
return
if len(self.exposure_entities_to_run) > 0:
# Keep the original content (several names are listed)
exposure_entities_full_names = deepcopy(self.exposure_entities_to_run)
if (
self.exposure_entities_to_run[0].split(".")[-1] == "txt"
or self.exposure_entities_to_run[0].split(".")[-1] == "csv"
):
# Retrieve names of exposure entities from the indicated file
with open(self.exposure_entities_to_run[0], "r") as f:
exposure_entities_full_names = []
for row in f.readlines():
raw_row = row.rstrip("\n")
raw_row = raw_row.split(",")
for element in raw_row:
exposure_entities_full_names.append(element)
f.close()
exposure_entities_to_run = []
if isinstance(self.exposure_entities_code, str):
# If so, it's already been validated that it's "ISO3"
for full_name in exposure_entities_full_names:
iso3_code = ExposureEntity.retrieve_country_ISO3(full_name)
if iso3_code is not None:
exposure_entities_to_run.append(iso3_code)
else:
logger.warning(
"ExposureEntity.retrieve_country_ISO3 has returned 'None' for exposure "
"entity %s. %s will not be run." % (full_name, full_name)
)
if isinstance(self.exposure_entities_code, dict):
for full_name in exposure_entities_full_names:
if full_name in self.exposure_entities_code.keys():
exposure_entities_to_run.append(self.exposure_entities_code[full_name])
else:
logger.warning(
"'exposure_entities_code' in the configuration file does not contain a "
"code for exposure entity %s. %s will not be run."
% (full_name, full_name)
)
self.exposure_entities_to_run = exposure_entities_to_run
return
......@@ -17,6 +17,7 @@
# along with this program. If not, see http://www.gnu.org/licenses/.
import logging
import numpy
from gdeimporter.tools.database import Database
......@@ -84,3 +85,62 @@ class DatabaseQueries:
aggregated_source_id = -999
return aggregated_source_id
@staticmethod
def retrieve_all_exposure_entities_of_aggregated_source_id(
aggregated_source_id, db_gde_tiles_config, db_table
):
"""This function retrieves the 3-character codes of all exposure entities associated
with 'aggregated_source_id' in 'db_table' of the database whose credentials are given by
'db_gde_tiles_config'.
Args:
aggregated_source_id (int):
ID of the source of the aggregated exposure model to be run.
db_gde_tiles_config (dict):
Dictionary containing the credentials needed to connect to the SQL database in
which information on exposure entities is stored. The keys of the dictionary
need to be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
db_table (str):
Name of the table of the SQL database from which the exposure entities can be
retrieved. It is assumed that this table contains, at least, the following
fields:
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
exposure_entity (str):
3-character code of the exposure entity.
Returns:
exposure_entities (list of str):
List of 3-character codes of the exposure entities associated with
'aggregated_source_id'.
"""
sql_query = "SELECT exposure_entity FROM %s WHERE aggregated_source_id=%s;"
db_gde_tiles = Database(**db_gde_tiles_config)
db_gde_tiles.create_connection_and_cursor()
db_gde_tiles.cursor.execute(sql_query % (db_table, aggregated_source_id))
exec_result = db_gde_tiles.cursor.fetchall()
db_gde_tiles.close_connection()
if len(exec_result) > 0:
exposure_entities_all = [exec_result[i][0] for i in range(len(exec_result))]
else:
exposure_entities_all = []
exposure_entities = list(numpy.unique(numpy.array(exposure_entities_all)))
return exposure_entities
......@@ -53,6 +53,20 @@ def main():
% (config.model_name, aggregated_source_id)
)
# Interpret and update config.exposure_entities_to_run
config.interpret_exposure_entities_to_run(aggregated_source_id)
if len(config.exposure_entities_to_run) < 1:
error_message = "Attribute 'exposure_entities_to_run' of configuration is an empty list"
raise OSError(error_message)
logger.info(
"%s exposure entity(ies) will be run: %s"
% (
str(len(config.exposure_entities_to_run)),
", ".join(config.exposure_entities_to_run),
)
)
# Leave the program
logger.info("gde-core has finished")
sys.exit()
......
......@@ -19,7 +19,7 @@
from setuptools import setup, find_packages
tests_require = ["pytest"]
linters_require = ["black>=20.8b1", "pylint", "flake8"]
linters_require = ["black>=22.1.0", "pylint", "flake8"]
setup(
name="gde-core",
......
......@@ -4,3 +4,5 @@ database_gde_tiles:
dbname: some_database_name
username: some_username
password: some_password
exposure_entities_to_run: Italy
exposure_entities_code: ISO3
United_Kingdom,Lithuania,Greece
\ No newline at end of file
Spain
Turkey
Germany
\ No newline at end of file
Entity1
Entity2
\ No newline at end of file
DROP TABLE IF EXISTS aggregated_sources;
DROP TABLE IF EXISTS data_units;
DROP TYPE IF EXISTS occupancycase;
DROP EXTENSION IF EXISTS postgis;
CREATE EXTENSION postgis;
CREATE TYPE occupancycase AS ENUM ('residential', 'commercial', 'industrial');
CREATE TABLE aggregated_sources
(
......@@ -11,3 +18,30 @@ VALUES ('esrm20', 'esrm20'),
('second_source', 'bbb'),
('third_source', 'ccc'),
('first_source', 'ddd');
CREATE TABLE data_units
(
data_unit_id VARCHAR,
occupancy_case occupancycase,
aggregated_source_id SMALLINT,
exposure_entity CHAR(3),
buildings_total FLOAT,
dwellings_total FLOAT,
people_census FLOAT,
cost_total FLOAT,
geometry GEOMETRY,
PRIMARY KEY (data_unit_id, occupancy_case, aggregated_source_id)
);
INSERT INTO data_units(data_unit_id,
occupancy_case,
aggregated_source_id,
exposure_entity,
buildings_total,
dwellings_total,
people_census,
cost_total)
VALUES ('ABC_123456', 'residential', 2, 'ABC', 0.0, 0.0, 0.0, 0.0),
('ABC_123456', 'commercial', 2, 'ABC', 0.0, 0.0, 0.0, 0.0),
('ABC_456789', 'residential', 2, 'ABC', 0.0, 0.0, 0.0, 0.0),
('DEF_456789', 'residential', 2, 'DEF', 0.0, 0.0, 0.0, 0.0);
......@@ -33,6 +33,9 @@ def test_Configuration():
assert returned_config.database_gde_tiles["dbname"] == "some_database_name"
assert returned_config.database_gde_tiles["username"] == "some_username"
assert returned_config.database_gde_tiles["password"] == "some_password"
assert len(returned_config.exposure_entities_to_run) == 1
assert returned_config.exposure_entities_to_run[0] == "Italy"
assert returned_config.exposure_entities_code == "ISO3"
# Test case in which the file is not found
with pytest.raises(OSError) as excinfo:
......@@ -41,3 +44,82 @@ def test_Configuration():
force_config_over_hierarchies=True,
)
assert "OSError" in str(excinfo.type)
def test_Configuration_interpret_exposure_entities_to_run(test_db):
returned_config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_good.yml"),
force_config_over_hierarchies=False,
)
# One country name provided, 'exposure_entities_code' is ISO3
returned_config.interpret_exposure_entities_to_run()
assert len(returned_config.exposure_entities_to_run) == 1
assert returned_config.exposure_entities_to_run[0] == "ITA"
# Several country names provided, 'exposure_entities_code' is ISO3
returned_config.exposure_entities_to_run = ["Italy", "France", "Portugal"]
returned_config.interpret_exposure_entities_to_run()
assert len(returned_config.exposure_entities_to_run) == 3
assert returned_config.exposure_entities_to_run[0] == "ITA"
assert returned_config.exposure_entities_to_run[1] == "FRA"
assert returned_config.exposure_entities_to_run[2] == "PRT"
# Several country names provided, one does not exist, 'exposure_entities_code' is ISO3
returned_config.exposure_entities_to_run = ["Italy", "France", "England"]
returned_config.interpret_exposure_entities_to_run()
assert len(returned_config.exposure_entities_to_run) == 2
assert returned_config.exposure_entities_to_run[0] == "ITA"
assert returned_config.exposure_entities_to_run[1] == "FRA"
# A name of a TXT file is provided, 'exposure_entities_code' is ISO3
returned_config.exposure_entities_to_run = [
os.path.join(os.path.dirname(__file__), "data", "exposure_entities.txt")
]
returned_config.interpret_exposure_entities_to_run()
assert len(returned_config.exposure_entities_to_run) == 3
assert returned_config.exposure_entities_to_run[0] == "ESP"
assert returned_config.exposure_entities_to_run[1] == "TUR"
assert returned_config.exposure_entities_to_run[2] == "DEU"
# A name of a CSV file is provided, 'exposure_entities_code' is ISO3
returned_config.exposure_entities_to_run = [
os.path.join(os.path.dirname(__file__), "data", "exposure_entities.csv")
]
returned_config.interpret_exposure_entities_to_run()
assert len(returned_config.exposure_entities_to_run) == 3
assert returned_config.exposure_entities_to_run[0] == "GBR"
assert returned_config.exposure_entities_to_run[1] == "LTU"
assert returned_config.exposure_entities_to_run[2] == "GRC"
# "all" is provided, 'exposure_entities_code' is ISO3
returned_config.exposure_entities_to_run = ["all"]
returned_config.interpret_exposure_entities_to_run(2)
assert len(returned_config.exposure_entities_to_run) == 2
assert returned_config.exposure_entities_to_run[0] == "ABC"
assert returned_config.exposure_entities_to_run[1] == "DEF"
# A name of a TXT file is provided, 'exposure_entities_code' is a dictionary
returned_config.exposure_entities_to_run = [
os.path.join(os.path.dirname(__file__), "data", "exposure_entities_non_countries.txt")
]
returned_config.exposure_entities_code = {
"Entity1": "EN1",
"Entity2": "ET2",
}
returned_config.interpret_exposure_entities_to_run()
assert len(returned_config.exposure_entities_to_run) == 2
assert returned_config.exposure_entities_to_run[0] == "EN1"
assert returned_config.exposure_entities_to_run[1] == "ET2"
# A name of a TXT file is provided, 'exposure_entities_code' is a dictionary, one missing
returned_config.exposure_entities_to_run = [
os.path.join(os.path.dirname(__file__), "data", "exposure_entities_non_countries.txt")
]
returned_config.exposure_entities_code = {
"Entity1": "EN1",
"Entity3": "ET3",
}
returned_config.interpret_exposure_entities_to_run()
assert len(returned_config.exposure_entities_to_run) == 1
assert returned_config.exposure_entities_to_run[0] == "EN1"
......@@ -33,3 +33,40 @@ def test_retrieve_aggregated_source_id(test_db):
)
assert returned_aggregated_source_id == 2
def test_retrieve_all_exposure_entities_of_aggregated_source_id(test_db):
# Database connection (the Configuration class will define the credentials based on whether
# the code is running in the CI or locally)
config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_good.yml")
)
# aggregated_source_id exists, several exposure entities associated with it
returned_exposure_entities = (
DatabaseQueries.retrieve_all_exposure_entities_of_aggregated_source_id(
2, config.database_gde_tiles, "data_units"
)
)
assert len(returned_exposure_entities) == 2
assert "ABC" in returned_exposure_entities
assert "DEF" in returned_exposure_entities
# aggregated_source_id exists, no exposure entities associated with it
returned_exposure_entities = (
DatabaseQueries.retrieve_all_exposure_entities_of_aggregated_source_id(
3, config.database_gde_tiles, "data_units"
)
)
assert len(returned_exposure_entities) == 0
# aggregated_source_id does not exist
returned_exposure_entities = (
DatabaseQueries.retrieve_all_exposure_entities_of_aggregated_source_id(
9999, config.database_gde_tiles, "data_units"
)
)
assert len(returned_exposure_entities) == 0
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment