Commit 4010738a authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Intermediate commit

parent a2e55b7b
Pipeline #36493 failed with stage
in 58 seconds
......@@ -24,6 +24,7 @@ import pandas
import geopandas
from gdeimporter.exposureentity import ExposureEntity
from gdeimporter.dataunit import DataUnit
from gdeimporter.tools.spatial import SpatialTools
logger = logging.getLogger()
......@@ -601,7 +602,15 @@ class ExposureModelESRM20(AggregatedExposureModel):
return exposure_entities
def get_data_units(self, configuration, exposure_entity_name, occupancy_case):
def get_data_units(
self,
configuration,
exposure_entity_name,
occupancy_case,
surface_threshold=1.0,
force_creation_data_units=False,
domain_polygon=None,
):
"""This function adds the DataUnits associated with an ExposureEntity with name
'exposure_entity_name' and occupancy `occupancy_case`.
......@@ -700,6 +709,171 @@ class ExposureModelESRM20(AggregatedExposureModel):
building_classes_proportions_and_properties[data_unit_id],
)
"""
# TO DO: Add condition to run based on configuration
self.ensure_full_geographic_coverage(
configuration,
exposure_entity_name,
occupancy_case,
geometries_table,
surface_threshold,
force_creation_data_units,
domain_polygon=domain_polygon,
)
"""
return
def ensure_full_geographic_coverage(
self,
configuration,
exposure_entity_name,
occupancy_case,
data_units_geometries_all,
surface_threshold,
force_creation_data_units=False,
domain_polygon=None,
):
"""This function...
WRIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIITEEEEEEEEEEEEEEEEEEEEEEE
Args:
configuration (Configuration object):
Instance of the Configuration class, with at least the following attributes:
WRIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIITEEEEEEEEEEEEEEEEEEEEEEE
exposure_entity_name (str):
Name of the ExposureEntity for which the function will be run.
occupancy_case (str):
Name of the occupancy case (e.g. "residential", "commercial", "industrial") for
which the function will be run. It needs to exist as a key of
self.exposure_entities[exposure_entity_name].occupancy_cases.
data_units_geometries_all
WRIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIITEEEEEEEEEEEEEEEEEEEEEEE
surface_threshold (positive float):
Absolute threshold of the difference between the surface area of the exposure
entity and that of all of its data units that defines the actions to be taken by
this function (percentage equal to or larger than zero).
force_creation_data_units (bool):
WRIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIITEEEEEEEEEEEEEEEEEEEEEEE
domain_polygon (Shapely Polygon or MultiPolygon):
Geometry against which the newly created data unit will be compared to define
whether it is kept, discarded or trimmed. If None, this comparison is not
carried out.
Returns:
WRIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIITEEEEEEEEEEEEEEEEEEEEEEE
"""
data_units_defined_ids = list(
self.exposure_entities[exposure_entity_name]
.occupancy_cases[occupancy_case]["data_units"]
.keys()
)
data_units_defined_geometries = [
dataunit.geometry
for dataunit in self.exposure_entities[exposure_entity_name]
.occupancy_cases[occupancy_case]["data_units"]
.values()
]
geometry_exposure_entity_aux = self._read_geometries_table(
configuration,
self.exposure_entities[exposure_entity_name],
occupancy_case,
{"ID_0": str},
automatic_level_detection=False,
requested_level=0,
)
# TO DO: move this check somewhere else? Before getting here?
if geometry_exposure_entity_aux.shape[0] > 1:
raise ValueError(
"The geodata file for Exposure Entity %s contains more than one geometry. "
"'ensure_full_geographic_coverage' cannot run.
% (exposure_entity_name)
)
geometry_exposure_entity = geometry_exposure_entity_aux["geometry"].to_numpy()[0]
del geometry_exposure_entity_aux
# Check that 'geometry_exposure_entity' and 'data_units_geometries_all' are contained
# in 'domain_polygon'
if domain_polygon is not None:
exposure_entity_fully_inside = SpatialTools.check_bounding_box_inside_polygon(
geometry_exposure_entity, domain_polygon
)
if not exposure_entity_fully_inside:
geometry_exposure_entity = geometry_exposure_entity.overlay(
domain_polygon, how="intersection"
)
data_units_all_fully_inside = SpatialTools.check_bounding_box_inside_polygon(
data_units_geometries_all, domain_polygon
)
if not data_units_all_fully_inside:
data_units_geometries_all = data_units_geometries_all.overlay(
domain_polygon, how="intersection"
)
num_data_units_defined = len(data_units_defined_ids)
num_data_units_all = data_units_geometries_all.shape[0]
if (num_data_units_defined != num_data_units_all) or force_creation_data_units:
# Create data units to cover missing areas
SpatialTools.geometries_difference(
data_units_defined_geometries,
geometry_exposure_entity,
)
# I LEFT IT HERE
# need to add column of "ID_X"
# need to create a name for the new data unit under "ID_X"
# need to gather building classes for whole exposure entity
else:
# Check difference in surface area of exposure entity and defined data units
# Surface area of the data units for which exposure of occupancy_case is defined
surface_data_units_defined = self.exposure_entities[
exposure_entity_name
].sum_surface_area_of_data_units(occupancy_case, configuration.number_cores)
# Surface area of the exposure entity (defined as administrative level 0)
surface_exposure_entity = SpatialTools.get_area_albers_equal_area(
geometry_exposure_entity
)
surface_difference = (
100.0
* (surface_exposure_entity - surface_data_units_defined)
/ surface_exposure_entity
)
if surface_difference > surface_threshold:
# Create data units to cover what is missing
SpatialTools.geometries_difference(
data_units_defined_geometries,
geometry_exposure_entity,
)
else:
if surface_difference < -surface_threshold:
# This case needs to be revised manually by a human (surface_difference is
# smaller than surface_threshold with negative sign)
logger.warning(
"ensure_full_geographic_coverage has detected a potential issue with "
"Exposure Entity '%s', occupance case '%s': The surface area of the "
"exposure entity is smaller than that of all its data units for which "
"the aggregated exposure model '%s' is defined by %s %. To force the "
"creation of data units set 'forece_filling_of_exposure_entity' to "
"True in the config.yml file."
% (
exposure_entity_name,
occupancy_case,
self.model_name,
surface_difference,
)
)
else:
# surface_difference is smaller than surface_threshold and larger than
# surface_threshold with negative sign --> do nothing
pass
return
def _map_data_units_types(self, original_description):
......@@ -842,9 +1016,21 @@ class ExposureModelESRM20(AggregatedExposureModel):
return data_table
def _read_geometries_table(self, configuration, exposure_entity, occupancy_case, datatypes):
def _read_geometries_table(
self,
configuration,
exposure_entity,
occupancy_case,
datatypes,
automatic_level_detection=True,
requested_level=None,
):
"""This function reads a geodata file containing ESRM20-compatible boundaries and
returns it as a GeoPandas GeoDataFrame with geometries defined in EPSG:4326.
returns it as a GeoPandas GeoDataFrame with geometries defined in EPSG:4326. If
'automatic_level_detection' is True, the administrative level is automatically
determined to be that in which exposure is defined for 'occupancy_case' in
'exposure_entity'; otherwise, 'requested_level' needs to be set to indicate the desired
administrative level.
Args:
configuration (Configuration object):
......@@ -867,6 +1053,14 @@ class ExposureModelESRM20(AggregatedExposureModel):
file. The data types are first automatically determined by GeoPandas but then
forced to match those of `datatypes` within this function. It can be an empty
dictionary.
automatic_level_detection (bool):
If True, the administrative level is automatically determined to be that in
which exposure is defined for 'occupancy_case' in 'exposure_entity'; otherwise,
'requested_level' needs to be set to indicate the desired administrative level.
Default: True.
requested_level (int):
Target administrative level. It will only be used if 'automatic_level_detection'
is True.
Returns:
geometries_table (GeoPandas GeoDataFrame):
......@@ -874,10 +1068,15 @@ class ExposureModelESRM20(AggregatedExposureModel):
EPSG:4326.
"""
filename = self.boundary_filename_pattern["filename"] % (
exposure_entity.occupancy_cases[occupancy_case][
if automatic_level_detection:
target_level = exposure_entity.occupancy_cases[occupancy_case][
self.boundary_filename_pattern["first"]
],
]
else:
target_level = str(requested_level)
filename = self.boundary_filename_pattern["filename"] % (
target_level,
getattr(exposure_entity, self.boundary_filename_pattern["second"]),
)
......
......@@ -17,6 +17,7 @@
# along with this program. If not, see http://www.gnu.org/licenses/.
import logging
import numpy
import iso3166
from multiprocessing import Pool
from functools import partial
......@@ -527,6 +528,39 @@ class ExposureEntity:
return
def sum_surface_area_of_data_units(self, occupancy_case, number_cores):
"""This function calculates and adds up the surface area of all the data units contained
in this Exposure Entity for 'occupancy_case' using the Albers equal area projection.
Args:
occupancy_case (str):
Name of the occupancy case (e.g. "residential", "commercial", "industrial") for
which the surface areas of the data units will be calculated and added.
number_cores (int):
Number of CPU cores to be used to run this function.
Returns:
total_surface (float):
Sum of the surface area of all the data units contained in this Exposure Entity
for 'occupancy_case', in m2.
"""
data_units_geometries = [
dataunit.geometry
for dataunit in self.occupancy_cases[occupancy_case]["data_units"].values()
]
p = Pool(processes=number_cores)
data_units_surface_areas = p.map(
DataUnitTilesHelper.get_area_albers_equal_area, data_units_geometries
)
p.close()
p.join()
total_surface = numpy.array(data_units_surface_areas).sum()
return total_surface
def _interpret_exposure_entities_code(self, config_code):
"""This function interprets the value of exposure_entities_code given as configuration.
......
......@@ -238,3 +238,70 @@ class SpatialTools:
geometry = shapely.ops.transform(project_albers, input_polygon)
return geometry.area
@staticmethod
def check_bounding_box_inside_polygon(in_geometry_1, in_geometry_2):
"""This function determines whether the bounding box of 'in_geometry_1' is fully
contained in 'in_geometry_2' or not.
Args:
in_geometry_1 (Shapely Polygon or MultiPolygon):
Polygon defined in ESPG:4326.
in_geometry_2 (Shapely Polygon or MultiPolygon):
Polygon defined in ESPG:4326.
Returns:
fully_contained (bool):
If True, the bounding box of 'in_geometry_1' is fully contained in
'in_geometry_2'; if False, it is not.
"""
# Create GeoPandas GeoDataFrame with the bounding box of 'in_geometry_1'
bounding_polygon = shapely.geometry.Polygon([
(in_geometry_1.total_bounds[0], in_geometry_1.total_bounds[1]),
(in_geometry_1.total_bounds[2], in_geometry_1.total_bounds[1]),
(in_geometry_1.total_bounds[2], in_geometry_1.total_bounds[3]),
(in_geometry_1.total_bounds[0], in_geometry_1.total_bounds[3])
]
)
bounding_gdf = geopandas.GeoDataFrame({"geometry": [bounding_polygon]}, geometry="geometry")
bounding_gdf.crs = pyproj.CRS("epsg:4326")
NEED TO REPROJECT TO SOMETHING GOOD WITH AREAS!!
# Create GeoPandas GeoDataFrame with 'in_geometry_2'
in_geometry_2_gdf = geopandas.GeoDataFrame({"geometry": [in_geometry_2]}, geometry="geometry")
in_geometry_2_gdf.crs = pyproj.CRS("epsg:4326")
# Calculate the difference
bounding_difference = bounding_gdf.overlay(in_geometry_2_gdf, how='difference')
if bounding_difference.shape[0] < 1:
fully_contained = True
else:
fully_contained = False
return fully_contained
@staticmethod
def geometries_difference(in_geometries_parts, in_geometry_encompassing):
"""This function creates new geometries from the difference between
'in_geometry_encompassing' and 'in_geometries_parts'. Though 'in_geometries_parts' are
intended to be geometries smaller than 'in_geometry_encompassing' and contained by it,
the function will work with any input geometries.
Args:
in_geometries_parts (list of Shapely polygons):
List of geometries.
in_geometry_encompassing
"""
in_geometries = geopandas.GeoDataFrame(
{"geometry": in_geometries_parts}, geometry="geometry"
)
in_geometries = in_geometries.dissolve()
out_geometry = in_geometry_encompassing.overlay(in_geometries, how="difference")
out_geometries = out_geometry["geometry"].to_numpy()
return out_geometries
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment