Commit b9650126 authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Added functionality to read names of data units

parent 8ab32c13
Pipeline #25438 passed with stage
in 1 minute and 11 seconds
model_name: esrm20
exposure_format: esrm20 # Only supported value for now
metadata_filepath: full_path_to_xlsx_file_with_model_metadata
data_pathname: path_to_directory_with_model_data
occupancies_to_run: residential, commercial # From 'occupancy_cases', industrial not supported
......@@ -16,6 +16,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import os
import abc
import logging
import numpy
......@@ -100,6 +101,13 @@ class ExposureModelESRM20(AggregatedExposureModel):
types of data_units.
self.filename_pattern (str):
Pattern of the names of the ESRM20 CSV files.
self.file_structure (dict):
Dictionary specifying the file structure of the ESRM20 model files within the data
pathname specified in the configuration file. It contains the following subkeys:
metadata (str):
Relative path to the metadata .xlsx file.
CSVs (str):
Relative path to the folder that contains the CSV files per exposure entity.
self.exposure_entities (dictionary of ExposureEntity):
Dictionary of instances of ExposureEntity objects, each of which represent an
exposure entity where the input aggregated exposure model is defined. See
......@@ -128,8 +136,12 @@ class ExposureModelESRM20(AggregatedExposureModel):
}
self.filename_pattern = {
"filename": "Exposure_Model_%s_%s.csv",
"first": "entity_name",
"second": "occupancy_short",
"first": "name",
"second": "short",
}
self.file_structure = {
"metadata": "sources/European_Exposure_Model_Data_Inputs_Sources.xlsx",
"CSVs": "_exposure_models",
}
self.exposure_entities = self.retrieve_exposure_entities(configuration)
......@@ -140,12 +152,12 @@ class ExposureModelESRM20(AggregatedExposureModel):
Args:
configuration (Configuration object):
Instance of the Configuration class, with at least the following attributes:
exposure_format (string):
exposure_format (str):
Format of the input aggregated model. Currently supported values:
"esrm20" (any other input will return an empty dictionary).
metadata_filepath (string):
Full file path to the .xlsx file that contains metadata on the input
aggregated exposure model.
data_pathname (str):
Path to the directory that contains the input aggregated exposure model
data.
Returns:
exposure_entities (dictionary of ExposureEntity):
......@@ -169,7 +181,7 @@ class ExposureModelESRM20(AggregatedExposureModel):
# Read the file (errors will be handled by pandas)
metadata = pandas.read_excel(
configuration.metadata_filepath,
os.path.join(configuration.data_pathname, self.file_structure["metadata"]),
sheet_name=self.occupancy_cases[case]["sheet_name"],
header=None, # Otherwise we cannot handle repeated column names properly
index_col=0, # Use first column as index
......@@ -185,9 +197,9 @@ class ExposureModelESRM20(AggregatedExposureModel):
# Check if there are repeated names of exposure entities (terminate if True)
if len(read_names) != len(numpy.unique(read_names)):
logger.critical(
"ERROR: REPEATED NAMES OF EXPOSURE ENTITIES FOUND "
"IN OCCUPANCY CASE %s. "
"retrieve_exposure_entities COULD NOT RUN." % (case)
"Error: repeated names of exposure entities found "
"in occupancy case %s. "
"'retrieve_exposure_entities' could not run." % (case)
)
break
......@@ -205,7 +217,12 @@ class ExposureModelESRM20(AggregatedExposureModel):
]
if len(data_units_types_row.shape) > 1: # This should not occur
logger.critical(
"ERROR READING %s: ROW NOT FOUND." % (configuration.metadata_filepath)
"Error reading %s: row not found."
% (
os.path.join(
configuration.data_pathname, self.file_structure["metadata"]
)
)
)
data_units_types_row = data_units_types_row.iloc[0, :]
data_units_types_row.iloc[:] = "unknown"
......@@ -227,6 +244,56 @@ class ExposureModelESRM20(AggregatedExposureModel):
return exposure_entities
def get_data_units_names(self, configuration, exposure_entity, occupancy_case):
"""This function retrieves the names of the data units associated with a specified
exposure_entity and occupancy_case in the ESRM20 model.
Args:
configuration (Configuration object):
Instance of the Configuration class, with at least the following attribute:
data_pathname (str):
Path to the directory that contains the input aggregated exposure model
data.
exposure_entity (ExposureEntity object):
Instance of the ExposureEntity class, with at leas the following attribute:
name (str):
Name of the exposure entity.
occupancy_cases (dict):
Dictionary as defined in the attributes of ExposureEntity.
'occupancy_case' needs to be one of its keys, with existing subkey
'data_units_level'.
occupancy_case (str):
Name of the occupancy case (e.g. "residential", "commercial", "industrial") for
which the names of the data units will be retrieved. It needs to exist as a key
of exposure_entity.occupancy_cases.
Returns:
The function will update 'exposure_entity', creating the subkey 'data_units_names'
to exposure_entity.occupancy_cases[occupancy_case] and storing in it the list of
data unit names (as strings). The list will be empty if it was impossible to
retrieve the names (but the subkey will still be created).
"""
target_column_name = "ID_%s" % (
str(exposure_entity.occupancy_cases[occupancy_case]["data_units_level"])
)
datatypes = {target_column_name: str}
data_table = self._read_data_table(
configuration, exposure_entity, occupancy_case, datatypes
)
try:
exposure_entity.occupancy_cases[occupancy_case]["data_units_names"] = list(
data_table[target_column_name].unique()
)
except KeyError:
exposure_entity.occupancy_cases[occupancy_case]["data_units_names"] = []
logger.critical(
"Error while retrieving 'data_units_names' of %s, %s: column `%s` not found"
% (exposure_entity.name, occupancy_case, target_column_name)
)
return
def _map_data_units_types(self, original_description):
"""This function maps original descriptions of resolution/definition of an input
aggregated exposure model to attributes of the data units that an ExposureEntity
......@@ -264,7 +331,7 @@ class ExposureModelESRM20(AggregatedExposureModel):
if "admin level" in original_description:
data_units_type = "polygon"
data_units_level = str(original_description.split(" ")[-1])
data_units_level = str(original_description.split(" ")[2]) # [-1] may be " "
data_units_definition = "NUTS"
elif "30 arc seconds" in original_description:
data_units_type = "cell"
......@@ -276,3 +343,45 @@ class ExposureModelESRM20(AggregatedExposureModel):
data_units_definition = "unknown"
return data_units_type, data_units_level, data_units_definition
def _read_data_table(self, configuration, exposure_entity, occupancy_case, datatypes):
"""This function reads an ESRM20 CSV file and returns it as a Pandas DataFrame.
Args:
configuration (Configuration object):
Instance of the Configuration class, with at least the following attribute:
data_pathname (str):
Path to the directory that contains the input aggregated exposure model
data.
exposure_entity (ExposureEntity object):
Instance of the ExposureEntity class, with at least the following attribute:
name (str):
Name of the exposure entity.
occupancy_case (str):
Name of the occupancy case (e.g. "residential", "commercial", "industrial") for
which the CSV file will be read.
datatypes (dict):
Dictionary indicating the data type/s of (a) column/s of interest in the CSV
file (used to force Pandas to read a certain column as a certain type). Pandas
will automatically decide the data type for all columns whose data type is not
specified in 'datatypes'. If empty, Pandas will automatically decide the data
type for all columns.
Returns:
data_table (Pandas DataFrame):
DataFrame with all contents of the ESRM20 CSV file for the specified
ExposureEntity and occupancy_case.
"""
# Read the data file (errors will be handled by pandas)
filename = self.filename_pattern["filename"] % (
getattr(exposure_entity, self.filename_pattern["first"]),
self.occupancy_cases[occupancy_case][self.filename_pattern["second"]],
)
data_table = pandas.read_csv(
os.path.join(configuration.data_pathname, self.file_structure["CSVs"], filename),
dtype=datatypes,
sep=",",
)
return data_table
......@@ -30,12 +30,17 @@ class Configuration:
Name of the input aggregated model.
self.exposure_format (str):
Format of the input aggregated model. Currently supported values: "esrm20".
self.metadata_filepath (str):
Full file path to the .xlsx file that contains metadata on the input aggregated
exposure model.
self.data_pathname (str):
Path to the directory that contains the input aggregated exposure model data.
self.occupancies_to_run (list of str):
List of keys of occupancy_cases of the input aggregated exposure model for which
data will be retrieved.
"""
REQUIRES = ["metadata_filepath"]
REQUIRES = [
"data_pathname",
"occupancies_to_run",
]
def __init__(self, filepath):
"""
......@@ -49,14 +54,15 @@ class Configuration:
self.model_name = self._assign_parameter(config, "model_name")
self.exposure_format = self._assign_parameter(config, "exposure_format")
self.metadata_filepath = self._assign_parameter(config, "metadata_filepath")
self.data_pathname = self._assign_parameter(config, "data_pathname")
self.occupancies_to_run = self._assign_listed_parameters(config, "occupancies_to_run")
# Terminate if critical parameters are missing (not all parameters are critical)
for key_parameter in self.REQUIRES:
if not getattr(self, key_parameter):
error_message = (
"ERROR: PARAMETER '%s' COULD NOT BE RETRIEVED FROM "
"CONFIGURATION FILE. THE PROGRAM CANNOT RUN." % (key_parameter)
"Error: parameter '%s' could not be retrieved from "
"configuration file. The program cannot run." % (key_parameter)
)
logger.critical(error_message)
raise OSError(error_message)
......@@ -79,7 +85,7 @@ class Configuration:
with open(filepath, "r") as ymlfile:
config = yaml.load(ymlfile, Loader=yaml.FullLoader)
except FileNotFoundError:
logger.critical("ERROR instantiating Configuration: configuration file not found")
logger.critical("Error instantiating Configuration: configuration file not found")
config = {}
return config
......@@ -103,8 +109,37 @@ class Configuration:
assigned_parameter = config[input_parameter]
except KeyError:
logger.warning(
"WARNING: parameter '%s' is missing from configuration file" % (input_parameter)
"Warning: parameter '%s' is missing from configuration file" % (input_parameter)
)
assigned_parameter = None
return assigned_parameter
def _assign_listed_parameters(self, config, input_parameter):
"""This function searches for the key input_parameter in the dictionary config, and
splits its assigned value as per ", ", i.e. a comma plus space separation.
If input_parameter is not a key of config, the output is None.
Args:
config (dictionary):
The configuration file read as a dictionary. It may be an empty dictionary.
input_parameter (str):
Name of the desired parameter, to be searched for as a primary key of config.
Returns:
assigned_parameter (list of str):
Each element of the list is an element of config[input_parameter], separated as
per a comma followed by a space (", "). E.g. if 'config[input_parameter]' is
"Name_A, Name_B", 'assigned_parameter' is ["Name_A", "Name_B"].
"""
assigned_parameter = self._assign_parameter(config, input_parameter)
if assigned_parameter is None:
return None
assigned_parameter = assigned_parameter.split(", ")
return assigned_parameter
......@@ -63,6 +63,9 @@ class ExposureEntity:
| | (if "polygon" type) in which the data unit is defined. E.g.
| | "WGS84" for cells, "NUTS" for polygons that represent
| | administrative units.
| |_ data_units_names (list of str):
| | List of names of the data units associated with this
| | ExposureEntity and occupancy case.
|_ occupancy_cases.keys()[1]
| |_ data_units_type: ...
|_ ...
......
......@@ -40,29 +40,55 @@ def main():
# Retrieve list of exposure entities covered by the input aggregated model and the types of
# data units for which their exposure models are defined for different occupancy cases
if config.exposure_format.lower() not in EXPOSURE_MODELS:
raise IOError("ERROR: exposure_format NOT SUPPORTED")
raise OSError("'exposure_format' indicated in config.yml is not supported")
aem = EXPOSURE_MODELS[config.exposure_format.lower()](config)
if len(aem.exposure_entities) < 1:
raise ValueError("no exposure entities found in %s" % (aem.model_name))
# Retrieve names of data units per exposure entity and occupancy case
for exposure_entity_name in aem.exposure_entities:
for occupancy_case in config.occupancies_to_run:
aem.get_data_units_names(
config, aem.exposure_entities[exposure_entity_name], occupancy_case
)
print("Name of the model: %s" % (aem.model_name))
print("Format: %s" % (aem.exposure_format))
if aem.occupancy_cases is not None:
print("Occupancy cases: %s" % (", ".join(aem.occupancy_cases)))
if len(aem.exposure_entities.keys()) > 0:
print("Data retrieved:")
for exposure_entity in aem.exposure_entities.keys():
print(" %s:" % exposure_entity)
for case in aem.exposure_entities[exposure_entity].occupancy_cases.keys():
print(" %s:" % case)
for attr in ["data_units_type", "data_units_level", "data_units_definition"]:
print(
" %s: %s"
% (
attr,
aem.exposure_entities[exposure_entity].occupancy_cases[case][attr],
)
print("Data retrieved:")
for exposure_entity in aem.exposure_entities.keys():
print(" %s:" % exposure_entity)
for case in aem.exposure_entities[exposure_entity].occupancy_cases.keys():
print(" %s:" % case)
for attr in ["data_units_type", "data_units_level", "data_units_definition"]:
print(
" %s: %s"
% (
attr,
aem.exposure_entities[exposure_entity].occupancy_cases[case][attr],
)
)
if (
"data_units_names"
in aem.exposure_entities[exposure_entity].occupancy_cases[case]
):
if (
len(
aem.exposure_entities[exposure_entity].occupancy_cases[case][
"data_units_names"
]
)
> 0
):
print(" data_units_names: retrieved")
else:
print(" data_units_names: not retrieved")
else:
print(" data_units_names: not retrieved")
# Leave the program
logger.info("gde-importer has finished")
......
lon,lat,taxonomy,number,ID_1
20.1,47.3,A,35.2,Unit_X
20.1,47.3,B,12.7,Unit_X
20.1,47.3,C,8.9,Unit_X
20.1,47.3,C,5.7,Unit_X
19.8,47.4,A,25.6,Unit_Y
19.8,47.4,A,7.9,Unit_Y
19.8,47.4,B,23.5,Unit_Y
20.4,46.9,A,40.2,Unit_Z
20.4,46.9,B,16.3,Unit_Z
20.4,46.9,C,33.4,Unit_Z
lon,lat,taxonomy,number,ID_1
20.1,47.3,A,35.2,Unit_1
20.1,47.3,B,12.7,Unit_1
20.1,47.3,C,8.9,Unit_1
20.1,47.3,C,5.7,Unit_1
19.8,47.4,A,25.6,Unit_2
19.8,47.4,A,7.9,Unit_2
19.8,47.4,B,23.5,Unit_2
20.4,46.9,A,40.2,Unit_3
20.4,46.9,B,16.3,Unit_3
20.4,46.9,C,33.4,Unit_3
lon,lat,taxonomy,number,ID_2
20.6,46.8,A,38.72,Unit_A
20.6,46.8,B,13.97,Unit_A
20.6,46.8,B,9.79,Unit_A
20.6,46.8,C,6.27,Unit_A
20.3,46.9,A,28.16,Unit_B
20.3,46.9,C,8.69,Unit_B
20.3,46.9,C,25.85,Unit_B
20.9,46.4,A,44.22,Unit_C
20.9,46.4,B,17.93,Unit_C
20.9,46.4,C,36.74,Unit_C
lon,lat,taxonomy,number,ID_1
20.6,46.8,A,38.72,Unit_1
20.6,46.8,B,13.97,Unit_1
20.6,46.8,B,9.79,Unit_1
20.6,46.8,C,6.27,Unit_1
20.3,46.9,A,28.16,Unit_2
20.3,46.9,C,8.69,Unit_2
20.3,46.9,C,25.85,Unit_2
20.9,46.4,A,44.22,Unit_3
20.9,46.4,B,17.93,Unit_3
20.9,46.4,C,36.74,Unit_3
model_name: esrm20
exposure_format: esrm20
metadata_filepath: /some/path/metadata.xlsx
occupancy_cases:
residential:
sheet_name: RES
data_units_types_field: Admin level resolution/aggregation
commercial:
sheet_name: COM
data_units_types_field: Admin level resolution/aggregation
industrial:
sheet_name: IND
data_units_types_field: Resolution
data_pathname: /some/path/to/directory
occupancies_to_run: residential, commercial, industrial
model_name: esrm20
exposure_format: esrm20
data_pathname: /some/path/to/directory
occupancies_to_run: residential
Variables,Country_1,Country_2,Country_3,Country_4,Country_5,Country_6,Country_7,Country_8
Another parameter 1,15,12,30,59,65,52,59,"201,200"
Resolution,30 arc seconds,admin level 1,admin level 2,30 arc seconds,30 arc seconds,random content,admin level 1,30 arc seconds
Another parameter 1,400,129,672,324,279,629,645,921
......@@ -27,32 +27,20 @@ def test_ExposureModelESRM20():
config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_good.yml")
)
config.metadata_filepath = os.path.join(
os.path.dirname(__file__), "data", "something_that_does_not_exist.xlsx"
config.data_pathname = os.path.join(
os.path.dirname(__file__), "something_that_does_not_exist"
)
with pytest.raises(FileNotFoundError) as excinfo:
returned_aem = ExposureModelESRM20(config)
assert "FileNotFoundError" in str(excinfo.type)
# Test case in which the metadata file is not an .xlsx file
config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_good.yml")
)
config.metadata_filepath = os.path.join(
os.path.dirname(__file__), "data", "metadata_for_testing.csv"
)
with pytest.raises(ValueError) as excinfo:
returned_aem = ExposureModelESRM20(config)
assert "ValueError" in str(excinfo.type)
# Test case in which there are repeated names of exposure entities in the metadata
config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_good.yml")
)
config.metadata_filepath = os.path.join(
os.path.dirname(__file__), "data", "metadata_for_testing_repeated_col.xlsx"
config.data_pathname = os.path.join(
os.path.dirname(__file__), "data", "ExposureModelESRM20_repeated_col"
)
returned_aem = ExposureModelESRM20(config)
......@@ -62,8 +50,8 @@ def test_ExposureModelESRM20():
config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_good.yml")
)
config.metadata_filepath = os.path.join(
os.path.dirname(__file__), "data", "metadata_for_testing_repeated_field.xlsx"
config.data_pathname = os.path.join(
os.path.dirname(__file__), "data", "ExposureModelESRM20_repeated_field"
)
returned_aem = ExposureModelESRM20(config)
......@@ -91,8 +79,8 @@ def test_ExposureModelESRM20():
config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_good.yml")
)
config.metadata_filepath = os.path.join(
os.path.dirname(__file__), "data", "metadata_for_testing.xlsx"
config.data_pathname = os.path.join(
os.path.dirname(__file__), "data", "ExposureModelESRM20_non_trivial"
)
returned_aem = ExposureModelESRM20(config)
......@@ -173,3 +161,49 @@ def test_ExposureModelESRM20():
]
== expected_contents["Entity_%s" % (i + 1)][case][parameter]
)
# Go on to test the retrieval of names of the data units
exposure_entities_to_run = ["Entity_1", "Entity_2"]
occupancies_to_run = ["residential", "commercial"]
config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_good.yml")
)
config.data_pathname = os.path.join(
os.path.dirname(__file__), "data", "ExposureModelESRM20_non_trivial"
)
for exposure_entity_name in exposure_entities_to_run:
for occupancy_case in occupancies_to_run:
returned_aem.get_data_units_names(
config, returned_aem.exposure_entities[exposure_entity_name], occupancy_case
)
assert returned_aem.exposure_entities["Entity_1"].occupancy_cases["residential"][
"data_units_names"
] == ["Unit_1", "Unit_2", "Unit_3"]
assert (
returned_aem.exposure_entities["Entity_2"].occupancy_cases["residential"][
"data_units_names"
]
== []
)
assert returned_aem.exposure_entities["Entity_1"].occupancy_cases["commercial"][
"data_units_names"
] == ["Unit_X", "Unit_Y", "Unit_Z"]
assert returned_aem.exposure_entities["Entity_2"].occupancy_cases["commercial"][
"data_units_names"
] == ["Unit_A", "Unit_B", "Unit_C"]
assert (
"data_units_names"
not in returned_aem.exposure_entities["Entity_1"].occupancy_cases["industrial"]
)
assert (
"data_units_names"
not in returned_aem.exposure_entities["Entity_4"].occupancy_cases["residential"]
)
# Test that Pandas raises a FileNotFoundError if the CSV file is not found
with pytest.raises(FileNotFoundError) as excinfo:
returned_aem.get_data_units_names(
config, returned_aem.exposure_entities["Entity_3"], "residential"
)
assert "FileNotFoundError" in str(excinfo.type)
......@@ -28,7 +28,8 @@ def test_Configuration():
)
assert returned_config.model_name == "esrm20"
assert returned_config.exposure_format == "esrm20"
assert returned_config.metadata_filepath == "/some/path/metadata.xlsx"
assert returned_config.data_pathname == "/some/path/to/directory"
assert returned_config.occupancies_to_run == ["residential", "commercial", "industrial"]
# Test case in which the file is not found
with pytest.raises(OSError) as excinfo:
......@@ -36,3 +37,9 @@ def test_Configuration():
os.path.join(os.path.dirname(__file__), "data", "doesnotexist.yml")
)
assert "OSError" in str(excinfo.type)
# Test case in which there is only one value for 'occupancies_to_run':
returned_config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_one_list_val.yml")
)
assert returned_config.occupancies_to_run == ["residential"]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment