Commit 91140475 authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Added feature to get geometries of model domain and exposure entities

parent a2e55b7b
Pipeline #36491 passed with stage
in 2 minutes and 38 seconds
......@@ -2,6 +2,7 @@ model_name: esrm20
exposure_format: esrm20 # Only supported value for now
data_pathname: path_to_directory_with_model_data
boundaries_pathname: path_to_directory_with_boundary_files
domain_boundary_filepath: path_to_file_with_domain_boundary # Boundary that defines the limits of the input aggregated model; optional
occupancies_to_run: residential, commercial # Need to exist for the indicated `exposure format`, industrial not supported
exposure_entities_to_run: Luxembourg # Either "all", a comma-space-separated list of entity names, or a name of a .txt or .csv file
exposure_entities_code: ISO3 # Either "ISO3" in this or a nested structure with exposure entities names and 3-character codes
......
......@@ -24,6 +24,8 @@ import pandas
import geopandas
from gdeimporter.exposureentity import ExposureEntity
from gdeimporter.dataunit import DataUnit
from gdeimporter.tools.spatial import SpatialTools
logger = logging.getLogger()
......@@ -36,6 +38,10 @@ class AggregatedExposureModel(abc.ABC):
Name of the input aggregated model.
self.exposure_format (str):
Format of the input aggregated model. Currently supported values: "esrm20".
self.domain_geometry (Shapely Polygon or Multipolygon):
Geometry within which the AggregatedExposureModel is defined. It should not
follow the boundaries of its exposure entities with precision but allow to determine
if the bounding boxes of its exposure entities are fully contained within it.
self.occupancy_cases (dict):
Dictionary in which each first level key corresponds to an occupancy case (e.g.
"residential", "commercial", "industrial") for which the input aggregated exposure
......@@ -63,6 +69,7 @@ class AggregatedExposureModel(abc.ABC):
self.model_name = configuration.model_name
self.exposure_format = configuration.exposure_format
self.domain_geometry = self._retrieve_domain_geometry(configuration)
self.occupancy_cases = None
self.exposure_entities = None
self.filename_pattern = None
......@@ -298,6 +305,36 @@ class AggregatedExposureModel(abc.ABC):
return
def _retrieve_domain_geometry(self, configuration):
"""This function retrieves the geometry of the geographic domain of the
AggregatedExposureModel, following the path indicated in 'configuration'. If
configuration.domain_boundary_filepath is None, it returns None.
Args:
configuration (Configuration object):
Instance of the Configuration class.
Returns:
domain_polygon (Shapely Polygon or Multipolygon):
Geometry retrieved from configuration.domain_boundary_filepath.
"""
if configuration.domain_boundary_filepath is None:
return None
# Read the data file (errors will be handled by geopandas)
domain_polygon_gdf = geopandas.GeoDataFrame.from_file(
os.path.join(configuration.domain_boundary_filepath),
)
domain_polygon_gdf = domain_polygon_gdf.to_crs("EPSG:4326")
if domain_polygon_gdf.shape[0] > 1:
domain_polygon_gdf = domain_polygon_gdf.dissolve()
domain_polygon = domain_polygon_gdf["geometry"].to_numpy()[0]
return domain_polygon
class ExposureModelESRM20(AggregatedExposureModel):
"""This class represents the European Seismic Risk Model 2020 (ESRM20) aggregated exposure
......@@ -483,6 +520,9 @@ class ExposureModelESRM20(AggregatedExposureModel):
used as the codes for the exposure entities. If it is a dictionary
instead, its contents will be used to assign the codes to the exposure
entities.
boundaries_pathname (str):
Path to the directory that contains the geodata files with the
boundaries.
Returns:
exposure_entities (dictionary of ExposureEntity):
......@@ -567,10 +607,41 @@ class ExposureModelESRM20(AggregatedExposureModel):
for exposure_entity in read_names:
if exposure_entity not in exposure_entities.keys():
# Retrieve boundaries of the exposure entity
geometry_exposure_entity = self._read_exposure_entity_geometry(
configuration,
exposure_entity,
datatypes={"ID_0": str},
)
if self.domain_geometry is not None:
# Verify that the bounding box of 'geometry' lies within the domain
# polygon of the Aggregated Exposure Model
exposure_entity_fully_inside = (
SpatialTools.check_bounding_box_inside_polygon(
geometry_exposure_entity, self.domain_geometry
)
)
if not exposure_entity_fully_inside:
# Trim the boundaries of the exposure entity
geometry_exposure_entity = SpatialTools.geometries_intersection(
geometry_exposure_entity, self.domain_geometry
)
logger.warning(
"Boundary for Exposure Entity '%s' was trimmed to fit within "
"the domain polygon of the Aggregated Exposure Model."
% (exposure_entity)
)
# Create ExposureEntity object
exposure_entities[exposure_entity] = ExposureEntity(
exposure_entity, configuration.exposure_entities_code, self.currency
exposure_entity,
configuration.exposure_entities_code,
geometry_exposure_entity,
self.currency,
)
# Retrieve metadata regarding the data units of this exposure_entity
output = self._map_data_units_types(data_units_types_row.loc[exposure_entity])
(data_units_type, data_units_level, data_units_definition) = output
output = {
......@@ -668,7 +739,7 @@ class ExposureModelESRM20(AggregatedExposureModel):
self.boundary_filename_pattern["first"]
]
)
geometries_table = self._read_geometries_table(
geometries_table = self._read_data_units_geometries_table(
configuration,
self.exposure_entities[exposure_entity_name],
occupancy_case,
......@@ -842,7 +913,9 @@ class ExposureModelESRM20(AggregatedExposureModel):
return data_table
def _read_geometries_table(self, configuration, exposure_entity, occupancy_case, datatypes):
def _read_data_units_geometries_table(
self, configuration, exposure_entity, occupancy_case, datatypes
):
"""This function reads a geodata file containing ESRM20-compatible boundaries and
returns it as a GeoPandas GeoDataFrame with geometries defined in EPSG:4326.
......@@ -888,20 +961,64 @@ class ExposureModelESRM20(AggregatedExposureModel):
geometries_table = geometries_table.to_crs("EPSG:4326")
# Force data types (dtype=datatypes not working in geopandas.GeoDataFrame.from_file)
column_names = list(datatypes)
for column_name in column_names:
if (
geometries_table[column_name].dtype == "float64"
and datatypes[column_name] is str
):
# Condition needed to deal with integers that geopandas reads as floats
geometries_table[column_name] = geometries_table[column_name].astype(int)
geometries_table[column_name] = geometries_table[column_name].astype(
datatypes[column_name]
)
geometries_table = SpatialTools.force_data_types_in_GeoDataFrame(
geometries_table, datatypes
)
return geometries_table
def _read_exposure_entity_geometry(
self,
configuration,
exposure_entity_name,
datatypes,
):
"""This function reads a geodata file containing the boundaries of an exposure entity
with name 'exposure_entity_name' and returns it as a Shapely Polygon/Multipolygon in
EPSG:4326.
Args:
configuration (Configuration object):
Instance of the Configuration class, with at least the following attribute:
boundaries_pathname (str):
Path to the directory that contains the geodata files with the
boundaries.
exposure_entity_name (str):
Name of the exposure entity whose boundary will be retrieved.
datatypes (dict):
Dictionary indicating the data type/s of (a) column/s of interest in the CSV
file. The data types are first automatically determined by GeoPandas but then
forced to match those of `datatypes` within this function. It can be an empty
dictionary.
Returns:
geometry (Shapely Polygon or MultiPolygon):
Boundaries of the exposure entity with name 'exposure_entity_name' in EPSG:4326.
"""
filename = self.boundary_filename_pattern["filename"] % (
0,
exposure_entity_name,
)
# Read the data file (errors will be handled by geopandas)
geometries_table = geopandas.GeoDataFrame.from_file(
os.path.join(configuration.boundaries_pathname, filename),
)
geometries_table = geometries_table.to_crs("EPSG:4326")
# Force data types (dtype=datatypes not working in geopandas.GeoDataFrame.from_file)
geometries_table = SpatialTools.force_data_types_in_GeoDataFrame(
geometries_table, datatypes
)
if geometries_table.shape[0] > 1:
geometries_table = geometries_table.dissolve()
geometry = geometries_table["geometry"].to_numpy()[0]
return geometry
def _retrieve_totals_all_data_units(
self, data_table, ids_column_name, data_units_ids, exposure_entity_name
):
......
......@@ -38,6 +38,10 @@ class Configuration:
self.boundaries_pathname (str):
Path to the directory that contains the boundaries of the exposure units convered
by the input aggregated exposure model.
self.domain_boundary_filepath (str):
Path to the geodata file that contains the boundaries within which the input
aggregated model is defined. It does not need to follow the boundaries of its
exposure units exactly.
self.occupancies_to_run (list of str):
List of keys of occupancy_cases of the input aggregated exposure model for which
data will be retrieved.
......@@ -118,6 +122,9 @@ class Configuration:
self.exposure_format = self._assign_parameter(config, "exposure_format")
self.data_pathname = self._assign_parameter(config, "data_pathname")
self.boundaries_pathname = self._assign_parameter(config, "boundaries_pathname")
self.domain_boundary_filepath = self._assign_parameter(
config, "domain_boundary_filepath"
)
self.occupancies_to_run = self._assign_listed_parameters(config, "occupancies_to_run")
self.exposure_entities_to_run = self._assign_listed_parameters(
config, "exposure_entities_to_run"
......
......@@ -37,6 +37,8 @@ class ExposureEntity:
self.code (str):
3-character code that uniquely identifies this exposure entity. If the exposure
entity is a country, this is the ISO3 code for the country.
self.geometry (Shapely Polygon or Multipolygon):
Geometry of the exposure entity.
self.currency (str):
Currency used to express building replacement costs.
self.occupancy_cases (dict):
......@@ -112,19 +114,22 @@ class ExposureEntity:
|_ ...
"""
def __init__(self, name, config_code, currency):
def __init__(self, name, config_code, geometry, currency):
"""
Args:
name (str):
Name of the exposure entity.
config_code (str or dict):
Either "ISO3" (str) or a dictionary, of which name needs to be a key.
geometry (Shapely Polygon or MultiPolygon):
Geometry of the exposure entity.
currency (str):
Currency used to express building replacement costs.
"""
self.name = name
self.code = self._interpret_exposure_entities_code(config_code)
self.geometry = geometry
self.currency = currency
self.occupancy_cases = {}
......
......@@ -23,6 +23,7 @@ import geopandas
import pyproj
import mercantile
import shapely
from copy import deepcopy
logger = logging.getLogger()
......@@ -238,3 +239,179 @@ class SpatialTools:
geometry = shapely.ops.transform(project_albers, input_polygon)
return geometry.area
@staticmethod
def check_bounding_box_inside_polygon(in_geometry_1, in_geometry_2, in_crs="epsg:4326"):
"""This function determines whether the bounding box of 'in_geometry_1' is fully
contained in 'in_geometry_2' or not. The operation is carried out using the Albers Equal
Area projection.
Args:
in_geometry_1, in_geometry_2 (Shapely Polygons or MultiPolygons):
Polygons or multipolygons defined in 'in_crs'.
in_crs (str):
Coordinate reference system (CRS) in which 'in_geometry_1' and 'in_geometry_2'
are defined. Default: "epsg:4326".
Returns:
fully_contained (bool):
If True, the bounding box of 'in_geometry_1' is fully contained in
'in_geometry_2'; if False, it is not.
"""
# Create GeoPandas GeoDataFrame with the bounding box of 'in_geometry_1'
bounding_polygon = shapely.geometry.Polygon(
[
(in_geometry_1.bounds[0], in_geometry_1.bounds[1]),
(in_geometry_1.bounds[2], in_geometry_1.bounds[1]),
(in_geometry_1.bounds[2], in_geometry_1.bounds[3]),
(in_geometry_1.bounds[0], in_geometry_1.bounds[3]),
]
)
bounding_gdf = SpatialTools.geodataframe_from_shapely([bounding_polygon], in_crs)
# Create GeoPandas GeoDataFrame with 'in_geometry_2'
in_geometry_2_gdf = SpatialTools.geodataframe_from_shapely([in_geometry_2], in_crs)
# Project both geometries using Albers equal area
(
bounding_gdf,
in_geometry_2_gdf,
) = SpatialTools.project_two_geodataframes_to_albers_equal_area(
bounding_gdf, in_geometry_2_gdf
)
# Calculate the difference
bounding_difference = geopandas.overlay(
bounding_gdf, in_geometry_2_gdf, how="difference"
)
bounding_difference = bounding_difference.to_crs(in_crs)
if bounding_difference.shape[0] < 1:
fully_contained = True
else:
fully_contained = False
return fully_contained
@staticmethod
def geometries_intersection(in_geometry_1, in_geometry_2, in_crs="epsg:4326"):
"""This function returns the intersection of 'in_geometry_1' and 'in_geometry_2',
carried out using the Albers Equal Area projection.
Args:
in_geometry_1, in_geometry_2 (Shapely Polygons or MultiPolygons):
Polygons or multipolygons defined in 'in_crs'.
in_crs (str):
Coordinate reference system (CRS) in which 'in_geometry_1' and 'in_geometry_2'
are defined. Default: "epsg:4326".
Returns:
out_geometry (Shapely Polygon or MultiPolygon):
Intersection of 'in_geometry_1' and 'in_geometry_2', defined in 'in_crs'.
"""
# Create GeoPandas GeoDataFrames
in_geometry_1_gdf = SpatialTools.geodataframe_from_shapely([in_geometry_1], in_crs)
in_geometry_2_gdf = SpatialTools.geodataframe_from_shapely([in_geometry_2], in_crs)
# Project both geometries using Albers equal area
(
in_geometry_1_gdf,
in_geometry_2_gdf,
) = SpatialTools.project_two_geodataframes_to_albers_equal_area(
in_geometry_1_gdf, in_geometry_2_gdf
)
out_geometry_gdf = geopandas.overlay(
in_geometry_1_gdf, in_geometry_2_gdf, how="intersection"
)
if out_geometry_gdf.shape[0] > 1:
out_geometry_gdf = out_geometry_gdf.dissolve()
out_geometry_gdf = out_geometry_gdf.to_crs(in_crs)
out_geometry = out_geometry_gdf["geometry"].to_numpy()[0]
return out_geometry
@staticmethod
def geodataframe_from_shapely(in_geometries, in_crs="epsg:4326"):
"""This function returns a GeoPandas GeoDataFrame containing the geometry/geometries
listed in 'in_geometries', defined with coordinate reference system (CRS) as per
'in_crs'.
Args:
in_geometries (list of Shapely Polygons or MultiPolygons):
List of geometries.
in_crs (str):
Coordinate reference system (CRS) in which 'in_geometries' is defined. Default:
"epsg:4326".
Returns:
out_gdf (GeoPandas GeoDataFrame):
GeoPandas GeoDataFrame with 'in_geometries' defined in the "geometry" column.
"""
out_gdf = geopandas.GeoDataFrame({"geometry": in_geometries}, geometry="geometry")
out_gdf.crs = pyproj.CRS(in_crs)
return out_gdf
@staticmethod
def project_two_geodataframes_to_albers_equal_area(in_gdf_1, in_gdf_2):
"""This function re-projects 'in_gdf_1' and 'in_gdf_2' into the Albers Equal Area
projection, using the bounding box of both input GeoDataFrames to define the geographic
extent for the projection.
Args:
in_gdf_1, in_gdf_2 (GeoPandas GeoDataFrames)
Returns:
out_gdf_1, out_gdf_2 (GeoPandas GeoDataFrames):
'in_gdf_1' and 'in_gdf_2' re-projected into the Albers Equal Area projection.
"""
out_gdf_1 = deepcopy(in_gdf_1)
out_gdf_2 = deepcopy(in_gdf_2)
aux_gdf = geopandas.overlay(out_gdf_1, out_gdf_2, how="union")
lon_w = aux_gdf.total_bounds[0]
lat_s = aux_gdf.total_bounds[1]
lon_e = aux_gdf.total_bounds[2]
lat_n = aux_gdf.total_bounds[3]
del aux_gdf
projection_string = "+proj=aea +lat_1={} +lat_2={} +lat_0={} +lon_0={}".format(
lat_s, lat_n, (lat_s + lat_n) / 2.0, (lon_w + lon_e) / 2.0
)
out_gdf_1 = out_gdf_1.to_crs(pyproj.CRS(projection_string))
out_gdf_2 = out_gdf_2.to_crs(pyproj.CRS(projection_string))
return out_gdf_1, out_gdf_2
@staticmethod
def force_data_types_in_GeoDataFrame(geodataframe, datatypes):
"""This function forces the datatypes of the columns of 'geodataframe' as indicated in
'datatypes'. The parameter 'dtype' is not working in geopandas.GeoDataFrame.from_file.
Args:
geodataframe (GeoPandas GeoDataFrame):
GeoPandas GeoDataFrame whose data types will be forced to match those indicated
in 'datatypes'
datatypes (dict):
Dictionary indicating the data type/s of (a) column/s of interest in
'geodataframe'. If empty, this function does not do anything.
Returns:
geodataframe (GeoPandas GeoDataFrame):
GeoPandas GeoDataFrame with data types matching those indicated in 'datatypes'.
"""
column_names = list(datatypes)
for column_name in column_names:
if geodataframe[column_name].dtype == "float64" and datatypes[column_name] is str:
# Condition needed to deal with integers that geopandas reads as floats
geodataframe[column_name] = geodataframe[column_name].astype(int)
geodataframe[column_name] = geodataframe[column_name].astype(datatypes[column_name])
return geodataframe
GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137.0,298.257223563]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]]
\ No newline at end of file
GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137.0,298.257223563]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]]
\ No newline at end of file
GEOGCS["GCS_WGS_1984",DATUM["D_WGS_1984",SPHEROID["WGS_1984",6378137.0,298.257223563]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]]
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment