Commit ab258de7 authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Added basic configuration and exposure model definitions

parent d7c45b15
Pipeline #25024 passed with stage
in 1 minute and 37 seconds
......@@ -5,10 +5,12 @@
Pipfile
Pipfile.lock
.idea
.coverage
__pycache__
.cache
build
dist
env
venv
config.yml
......@@ -3,6 +3,41 @@
Importer of aggregated exposure models to serve as input for the Global Dynamic Exposure (GDE)
model.
## Installing obmgapanalysis
### Software dependencies
- Python >= 3.7
### Python libraries
- `numpy`
- `pandas`
- `pyyaml`
- `openpyxl`
### Install
```bash
git clone https://git.gfz-potsdam.de/dynamicexposure/globaldynamicexposure/gde-importer.git
cd gde-importer
pip3 install -e .
```
## Running gde-importer
To run the gde-importer for the exposure model of the European Seismic Risk Model 2020 (ESRM20):
1. Clone the ESRM20 repository to a local path of your choice following these
[instructions](https://git.gfz-potsdam.de/dynamicexposure/datasources/-/tree/master/esrm20).
2. Copy the file `config-example.yml` to your working directory as `config.yml` and provide the
necessary parameters:
- exposure_format: esrm20
- metadata_filepath: /local/path/to/cloned/European_Exposure_Model_Data_Inputs_Sources.xlsx
3. From the working directory (where you placed `config.yml`), run the code by typing:
```
gdeimporter
```
## Copyright and copyleft
Copyright (C) 2021
......
model_name: esrm20
exposure_format: esrm20 # Only supported value for now
metadata_filepath: full_path_to_xlsx_file_with_model_metadata
occupancy_cases: # Occupancy cases to be processed
residential:
sheet_name: RES # Sheet name in metadata_filepath
data_units_types_field: fieldnameRes # Field name in sheet_name
commercial:
sheet_name: COM # Sheet name in metadata_filepath
data_units_types_field: fieldnameCom # Field name in sheet_name
industrial:
sheet_name: IND # Sheet name in metadata_filepath
data_units_types_field: fieldnameInd # Field name in sheet_name
#!/usr/bin/env python3
# Copyright (C) 2021:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import abc
import logging
import numpy
import pandas
from gdeimporter.exposureentity import ExposureEntity
logger = logging.getLogger()
class AggregatedExposureModel(abc.ABC):
"""This class represents an input aggregated exposure model.
Attributes:
self.model_name (str):
Name of the input aggregated model.
self.exposure_format (str):
Format of the input aggregated model. Currently supported values: "esrm20".
self.occupancy_cases (list of str):
List of names of the occupancy cases for which the input aggregated exposure model
is defined.
self.exposure_entities (dictionary of ExposureEntity):
Dictionary of instances of ExposureEntity objects, each of which represent an
exposure entity where the input aggregated exposure model is defined. See
attributes in description of ExposureEntity. The keys of the dictionary are the
names of the corresponding exposure entities.
"""
def __init__(self, configuration):
self.model_name = configuration.model_name
self.exposure_format = configuration.exposure_format
self.occupancy_cases = list(configuration.occupancy_cases.keys())
self.exposure_entities = self.retrieve_exposure_entities(configuration)
def retrieve_exposure_entities(self, configuration):
"""This function retrieves the exposure entities for which an input aggregated exposure
model is defined, together with the definition of their types of data units for each
occupancy case, by reading and processing the relevant data (as specified in the
respective subclasses).
Exposure entities can be, for example, countries or any other spatial/administrative
unit for which an aggregated exposure model is defined. Arbitrary polygons can be
administrative units, Voronoi cells, etc.
Args:
configuration (Configuration object):
Instance of the Configuration class.
Returns:
exposure_entities (dictionary of ExposureEntity):
Dictionary of instances of ExposureEntity objects, each of which represent an
exposure entity where the input aggregated exposure model is defined. See
attributes in description of ExposureEntity. The keys of the dictionary are the
names of the corresponding exposure entities.
"""
raise NotImplementedError
class ExposureModelESRM20(AggregatedExposureModel):
"""This class represents the European Seismic Risk Model 2020 (ESRM20) aggregated exposure
model.
See details in https://git.gfz-potsdam.de/dynamicexposure/datasources/-/tree/master/esrm20.
"""
def retrieve_exposure_entities(self, configuration):
"""This function retrieves the exposure entities of the ESRM20 model from the
corresponding .xlsx metadata file.
Args:
configuration (Configuration object):
Instance of the Configuration class, with at least the following attributes:
exposure_format (string):
Format of the input aggregated model. Currently supported values:
"esrm20" (any other input will return an empty dictionary).
metadata_filepath (string):
Full file path to the .xlsx file that contains metadata on the input
aggregated exposure model.
occupancy_cases (dictionary):
Dictionary in which each first level key corresponds to an occupancy
case (e.g. "residential", "commercial", "industrial"). Each first level
key contains two sub-keys: "sheet_name" and "data_units_types_field":
sheet_name (str):
Name of the sheet in the meatadata file of the input aggregated
exposure model from which info on this occupancy case can be
retrieved. E.g.: "RES", "COM", "IND".
data_units_types_field (str):
Name of the field in sheet_name from which to retrieve
information on the types of data_units. E.g.: "Resolution".
Returns:
exposure_entities (dictionary of ExposureEntity):
Dictionary of instances of ExposureEntity objects, each of which represent an
exposure entity where the input aggregated exposure model is defined. See
attributes in description of ExposureEntity. The keys of the dictionary are the
names of the corresponding exposure entities.
"""
exposure_entities = {}
logger.info(
"Retrieving exposure_entities from exposure with format %s "
"with retrieve_exposure_entities" % configuration.exposure_format
)
# Needs to go by occupancy case because the names and properties of the exposure
# entities can only be read from the metadata file for a sheet that is associated
# with a particular occupancy case:
for case in configuration.occupancy_cases.keys():
# Read the file (errors will be handled by pandas)
metadata = pandas.read_excel(
configuration.metadata_filepath,
sheet_name=configuration.occupancy_cases[case]["sheet_name"],
header=None, # Otherwise we cannot handle repeated column names properly
index_col=0, # Use first column as index
)
# Retrieve names of exposure entities
read_names = numpy.array(metadata.loc["Variables", :])
# Fix the possibility that there might be other rows named "Variables" other
# than the first one
if len(read_names.shape) > 1:
read_names = read_names[0, :]
# Check if there are repeated names of exposure entities (terminate if True)
if len(read_names) != len(numpy.unique(read_names)):
logger.critical(
"ERROR: REPEATED NAMES OF EXPOSURE ENTITIES FOUND "
"IN OCCUPANCY CASE %s. "
"retrieve_exposure_entities COULD NOT RUN." % (case)
)
break
# Use first row as header, once it has been confirmed that there are no repeated
# names of exposure entities
new_header = metadata.loc["Variables", :]
if len(new_header.shape) > 1: # When there are more than one "Variables" row
new_header = new_header.iloc[0, :]
metadata = metadata[1:] # Keep the data below the header row
metadata.columns = new_header # Set the header row as the new column names
# Retrieve the row from which the types of data units can be interpreted
data_units_types_row = metadata.loc[
configuration.occupancy_cases[case]["data_units_types_field"], :
]
if len(data_units_types_row.shape) > 1: # This should not occur
logger.critical(
"ERROR READING %s: ROW NOT FOUND." % (configuration.metadata_filepath)
)
data_units_types_row = data_units_types_row.iloc[0, :]
data_units_types_row.iloc[:] = "unknown"
for exposure_entity in read_names:
if exposure_entity not in exposure_entities.keys():
exposure_entities[exposure_entity] = ExposureEntity(exposure_entity)
output = self._map_data_units_types(data_units_types_row.loc[exposure_entity])
(data_units_type, data_units_level, data_units_definition) = output
output = {
"data_units_type": data_units_type,
"data_units_level": data_units_level,
"data_units_definition": data_units_definition,
}
# Write the contents occupancy_cases to the ExposureEntity object
exposure_entities[exposure_entity].occupancy_cases[case] = output
return exposure_entities
def _map_data_units_types(self, original_description):
"""This function maps original descriptions of resolution/definition of an input
aggregated exposure model to attributes of the data units that an ExposureEntity
comprises.
Args:
original_description (str): String coming from an input aggregated exposure model.
Returns:
data_units_type (str):
Type of data unit used by the ExposureEntity. Currently supported types:
"polygon":
Polygon of arbitrary shape (e.g. boundaries of an administrative unit,
Voronoi cell).
"cell":
Regular quadrilateral in a specified projection that can be sufficiently
and unequivocally defined by knowing (1) the coordinates of a vertex or
the centroid, (2) the width, (3) the height, and (4) the projection
system. E.g. a 30 arcsec (width) by 30 arcsec cell (height) in the World
Geodetic System 1984 (WGS84).
data_units_level (str):
Level/resolution of the data units used by the ExposureEntity. Currently
supported types:
"30arcsec30arcsec":
Applies to the "cell" type and defines a 30 arcsec (width) by 30 arcsec
(height) cell in the World Geodetic System 1984 (WGS84).
Any integer >= 0:
Applies to the "polygon" type and refers to an administrative unit level
as per a classification system.
data_units_definition (str):
Name of the projection (if "cell" type) or classification system (if "polygon"
type) in which the data units used by the ExposureEntity are defined. E.g.
"WGS84" for cells, "NUTS" for polygons that represent administrative units.
"""
if "admin level" in original_description:
data_units_type = "polygon"
data_units_level = str(original_description.split(" ")[-1])
data_units_definition = "NUTS"
elif "30 arc seconds" in original_description:
data_units_type = "cell"
data_units_level = "30arcsec_30arcsec"
data_units_definition = "WGS84"
else:
data_units_type = "unknown"
data_units_level = "unknown"
data_units_definition = "unknown"
return data_units_type, data_units_level, data_units_definition
#!/usr/bin/env python3
# Copyright (C) 2021:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import logging
import yaml
logger = logging.getLogger()
class Configuration:
"""This class handles the configuration parameters of the gde-importer.
Attributes:
self.model_name (str):
Name of the input aggregated model.
self.exposure_format (str):
Format of the input aggregated model. Currently supported values: "esrm20".
self.metadata_filepath (str):
Full file path to the .xlsx file that contains metadata on the input aggregated
exposure model.
self.occupancy_cases (dictionary):
Dictionary in which each first level key corresponds to an occupancy case (e.g.
"residential", "commercial", "industrial"). Each first level key contains two
sub-keys: "sheet_name" and "data_units_types_field":
sheet_name (str):
Name of the sheet in the meatadata file of the input aggregated exposure
model from which info on this occupancy case can be retrieved. E.g.: "RES",
"COM", "IND".
data_units_types_field (str):
Name of the field in sheet_name from which to retrieve information on the
types of data_units. E.g.: "Resolution".
"""
REQUIRES = ["metadata_filepath", "occupancy_cases"]
def __init__(self, filepath):
"""
Args:
filepath (str):
Full file path to the .yml configuration file.
"""
config = self.read_config_file(filepath)
self.model_name = self._assign_parameter(config, "model_name")
self.exposure_format = self._assign_parameter(config, "exposure_format")
self.metadata_filepath = self._assign_parameter(config, "metadata_filepath")
self.occupancy_cases = self._assign_hierarchical_parameters(
config, "occupancy_cases", ["sheet_name", "data_units_types_field"]
)
# Terminate if critical parameters are missing (not all parameters are critical)
for key_parameter in self.REQUIRES:
if not getattr(self, key_parameter):
error_message = (
"ERROR: PARAMETER '%s' COULD NOT BE RETRIEVED FROM "
"CONFIGURATION FILE. THE PROGRAM CANNOT RUN." % (key_parameter)
)
logger.critical(error_message)
raise OSError(error_message)
def read_config_file(self, filepath):
"""This function attempts to open the configuration file. If not found, it returns an
empty dictionary and logs a critical error.
Args:
filepath (str):
Full file path to the .yml configuration file.
Returns:
config (dictionary):
The configuration file read as a dictionary, or an empty dictionary if the
configuration file was not found.
"""
try:
with open(filepath, "r") as ymlfile:
config = yaml.load(ymlfile, Loader=yaml.FullLoader)
except FileNotFoundError:
logger.critical("ERROR instantiating Configuration: configuration file not found")
config = {}
return config
def _assign_parameter(self, config, input_parameter):
"""This function searches for the key input_parameter in the dictionary config. If
found, it returns its value (a string or a dictionary). If not found, it returns None.
Args:
config (dictionary):
The configuration file read as a dictionary. It may be an empty dictionary.
input_parameter (str):
Name of the desired parameter, to be searched for as a primary key of config.
Returns:
assigned_parameter (str, dictionary or None):
The content of config[input_parameter], which can be a string or a dictionary.
It is None if input_parameter is not a key of config.
"""
try:
assigned_parameter = config[input_parameter]
except KeyError:
logger.warning(
"WARNING: parameter '%s' is missing from configuration file" % (input_parameter)
)
assigned_parameter = None
return assigned_parameter
def _assign_hierarchical_parameters(self, config, input_parameter, requested_nested):
"""This function searches for the key input_parameter in the dictionary config, and for
each of the elements of requested_nested as keys of config[input_parameter].
If input_parameter is not a key of config, the output is None.
If input_parameter is a key of config, but one of the elements of requested_nested is
not a key of config[input_parameter]
Args:
config (dictionary):
The configuration file read as a dictionary. It may be an empty dictionary.
input_parameter (str):
Name of the desired parameter, to be searched for as a primary key of config.
requested_nested (list of str):
List of the names of the desired nested parameters, to be searched for as keys
of config[input_parameter].
Returns:
assigned_parameter (dictionary or None):
The content of config[input_parameter], if input_parameter is a key of config
and all elements of requested_nested are keys of config[input_parameter], or
None otherwise.
"""
assigned_parameter = self._assign_parameter(config, input_parameter)
if assigned_parameter is None:
return None
if not isinstance(assigned_parameter, dict):
return None
sub_parameters_missing = False
for case in assigned_parameter.keys():
for requested_parameter in requested_nested:
if requested_parameter not in assigned_parameter[case].keys():
logger.critical(
"ERROR instantiating Configuration: occupancy case '%s' does not "
"contain a '%s' parameter" % (case, requested_parameter)
)
sub_parameters_missing = True
if sub_parameters_missing is True:
assigned_parameter = None
return assigned_parameter
#!/usr/bin/env python3
# Copyright (C) 2021:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import logging
logger = logging.getLogger()
class ExposureEntity:
"""This class represents a geographical unit where an exposure model is defined. In the
spatial sense is encompasses smaller geographical data units.
Attributes:
self.name (str):
Name of the exposure entity.
self.occupancy_cases (dict):
Dictionary definining the type, level and definition of the data units used for each
occupancy case of the model (e.g. residential, commercial), with the following
structure:
self.occupancy_cases
|_ occupancy_cases.keys()[0]
| |_ data_units_type (str):
| | Type of data unit used to define this occupancy case in this
| | ExposureEntity. Currently supported types:
| | "polygon":
| | Polygon of arbitrary shape (e.g. boundaries of an
| | administrative unit, Voronoi cell).
| | "cell":
| | Regular quadrilateral in a specified projection that can
| | be sufficiently and unequivocally defined by knowing (1)
| | the coordinates of a vertex or the centroid, (2) the
| | width, (3) the height, and (4) the projection system.
| | E.g. a 30 arcsec (width) by 30 arcsec cell (height) in
| | the World Geodetic System 1984 (WGS84).
| |_ data_units_level (str):
| | Level/resolution of the data unit used to define this occupancy
| | case in this ExposureEntity. Currently supported types:
| | "30arcsec_30arcsec":
| | Applies to the "cell" type and defines a 30 arcsec
| | (width) by 30 arcsec (height) cell in the World Geodetic
| | System 1984 (WGS84).
| | Any integer >= 0:
| | Applies to the "polygon" type and refers to an
| | administrative unit level as per a classification
| | system.
| |_ data_units_definition (str):
| | Name of the projection (if "cell" type) or classification system
| | (if "polygon" type) in which the data unit is defined. E.g.
| | "WGS84" for cells, "NUTS" for polygons that represent
| | administrative units.
|_ occupancy_cases.keys()[1]
| |_ data_units_type: ...
|_ ...
"""
def __init__(self, name):
self.name = name
self.occupancy_cases = {}
......@@ -18,19 +18,54 @@
import logging
import sys
from gdeimporter.configuration import Configuration
from gdeimporter.aggregatedexposuremodel import ExposureModelESRM20
# Add a logger printing error, warning, info and debug messages to the screen
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
EXPOSURE_MODELS = {"esrm20": ExposureModelESRM20}
def main():
# Example logging output
# Log the start of the run
logger.info("gde-importer has started")
# Read configuration parameters
config = Configuration("config.yml")
# Retrieve list of exposure entities covered by the input aggregated model and the types of
# data units for which their exposure models are defined for different occupancy cases
if config.exposure_format.lower() not in EXPOSURE_MODELS:
raise IOError("ERROR: exposure_format NOT SUPPORTED")
aem = EXPOSURE_MODELS[config.exposure_format.lower()](config)
print("Name of the model: %s" % (aem.model_name))
print("Format: %s" % (aem.exposure_format))
if aem.occupancy_cases is not None:
print("Occupancy cases: %s" % (", ".join(aem.occupancy_cases)))
if len(aem.exposure_entities.keys()) > 0:
print("Data retrieved:")
for exposure_entity in aem.exposure_entities.keys():
print(" %s:" % exposure_entity)
for case in aem.exposure_entities[exposure_entity].occupancy_cases.keys():
print(" %s:" % case)
for attr in ["data_units_type", "data_units_level", "data_units_definition"]:
print(
" %s: %s"
% (
attr,
aem.exposure_entities[exposure_entity].occupancy_cases[case][attr],
)
)
# Leave the program
logger.info("gde-importer has finished")
sys.exit()
......
......@@ -28,7 +28,7 @@ setup(
keywords="Global Dynamic Exposure, GDE, buildings, exposure model",
author="Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ",
license="AGPLv3+",
install_requires=["numpy"],