Commit 5565d977 authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Added DataUnit class and feature to read data unit geometries

parent b9650126
Pipeline #25892 passed with stage
in 1 minute and 54 seconds
......@@ -18,6 +18,8 @@
import os
import pytest
import pandas
import numpy
from gdeimporter.configuration import Configuration
from gdeimporter.aggregatedexposuremodel import ExposureModelESRM20
......@@ -162,7 +164,7 @@ def test_ExposureModelESRM20():
== expected_contents["Entity_%s" % (i + 1)][case][parameter]
)
# Go on to test the retrieval of names of the data units
# Go on to test the retrieval of the data units
exposure_entities_to_run = ["Entity_1", "Entity_2"]
occupancies_to_run = ["residential", "commercial"]
config = Configuration(
......@@ -171,39 +173,80 @@ def test_ExposureModelESRM20():
config.data_pathname = os.path.join(
os.path.dirname(__file__), "data", "ExposureModelESRM20_non_trivial"
)
config.boundaries_pathname = os.path.join(
os.path.dirname(__file__), "data", "ExposureModelESRM20_non_trivial", "shapefiles"
)
for exposure_entity_name in exposure_entities_to_run:
for occupancy_case in occupancies_to_run:
returned_aem.get_data_units_names(
config, returned_aem.exposure_entities[exposure_entity_name], occupancy_case
returned_aem.get_data_units(config, exposure_entity_name, occupancy_case)
entity_names = ["Entity_1", "Entity_1", "Entity_2", "Entity_2"]
occupancy_names = ["residential", "commercial", "residential", "commercial"]
unit_ids = [
["Unit_1", "Unit_2", "Unit_3"],
["Unit_X", "Unit_Y", "Unit_Z"],
[],
["Unit_A", "Unit_B", "Unit_C"],
]
unit_geometries = pandas.read_csv(
os.path.join(
os.path.dirname(__file__),
"data",
"ExposureModelESRM20_non_trivial",
"shapefiles",
"unit_data_geometries.csv",
),
sep=",",
dtype={"LonW": str, "LonE": str, "LatN": str, "LatS": str},
)
assert returned_aem.exposure_entities["Entity_1"].occupancy_cases["residential"][
"data_units_names"
] == ["Unit_1", "Unit_2", "Unit_3"]
for i in range(len(entity_names)):
# Check number of data units
assert len(
returned_aem.exposure_entities[entity_names[i]]
.occupancy_cases[occupancy_names[i]]["data_units"]
.keys()
) == len(unit_ids[i])
# Check names of data units
for unit_id in unit_ids[i]:
assert (
returned_aem.exposure_entities["Entity_2"].occupancy_cases["residential"][
"data_units_names"
unit_id
in returned_aem.exposure_entities[entity_names[i]].occupancy_cases[
occupancy_names[i]
]["data_units"]
)
# Check bounds of the geometries of data units
for unit_id in unit_ids[i]:
geometry = (
returned_aem.exposure_entities[entity_names[i]]
.occupancy_cases[occupancy_names[i]]["data_units"][unit_id]
.geometry
)
returned_lon_w, returned_lat_s, returned_lon_e, returned_lat_n = geometry.bounds
expected_which_row = numpy.where(unit_geometries["DataUnit"].values == unit_id)[0][
0
]
== []
)
assert returned_aem.exposure_entities["Entity_1"].occupancy_cases["commercial"][
"data_units_names"
] == ["Unit_X", "Unit_Y", "Unit_Z"]
assert returned_aem.exposure_entities["Entity_2"].occupancy_cases["commercial"][
"data_units_names"
] == ["Unit_A", "Unit_B", "Unit_C"]
expected_lon_w = unit_geometries["LonW"].values[expected_which_row]
expected_lon_e = unit_geometries["LonE"].values[expected_which_row]
expected_lat_n = unit_geometries["LatN"].values[expected_which_row]
expected_lat_s = unit_geometries["LatS"].values[expected_which_row]
assert "{:.2f}".format(returned_lon_w) == expected_lon_w
assert "{:.2f}".format(returned_lon_e) == expected_lon_e
assert "{:.2f}".format(returned_lat_n) == expected_lat_n
assert "{:.2f}".format(returned_lat_s) == expected_lat_s
assert (
"data_units_names"
"data_units"
not in returned_aem.exposure_entities["Entity_1"].occupancy_cases["industrial"]
)
assert (
"data_units_names"
"data_units"
not in returned_aem.exposure_entities["Entity_4"].occupancy_cases["residential"]
)
# Test that Pandas raises a FileNotFoundError if the CSV file is not found
with pytest.raises(FileNotFoundError) as excinfo:
returned_aem.get_data_units_names(
config, returned_aem.exposure_entities["Entity_3"], "residential"
)
returned_aem.get_data_units(config, "Entity_3", "residential")
assert "FileNotFoundError" in str(excinfo.type)
# TO DO: test that the geometries are retrieved and stored correctly
......@@ -29,6 +29,7 @@ def test_Configuration():
assert returned_config.model_name == "esrm20"
assert returned_config.exposure_format == "esrm20"
assert returned_config.data_pathname == "/some/path/to/directory"
assert returned_config.boundaries_pathname == "/some/path/to/directory"
assert returned_config.occupancies_to_run == ["residential", "commercial", "industrial"]
# Test case in which the file is not found
......
#!/usr/bin/env python3
# Copyright (C) 2021:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import logging
import geopandas
import shapely
from pyproj import CRS
from gdeimporter.dataunit import DataUnit
logger = logging.getLogger()
def test_DataUnit():
# Test case in which the data unit IDs are strings (usual case)
target_column_name = "ID_X"
unit_ids = ["Unit_1", "123456"]
geometries = [
shapely.geometry.Polygon([(12.5, 3.7), (18.3, 4.2), (15.1, 6.7)]),
shapely.geometry.Polygon([(22.5, 13.7), (28.3, 14.2), (25.1, 16.7)]),
]
geometry = geopandas.GeoSeries(geometries)
aux_data = {target_column_name: unit_ids, "geometry": geometry}
geometries_table = geopandas.GeoDataFrame(aux_data)
geometries_table.crs = CRS("epsg:4326")
for i in range(len(unit_ids)):
returned_data_unit = DataUnit(unit_ids[i], geometries_table, target_column_name)
assert returned_data_unit.id == unit_ids[i]
assert returned_data_unit.geometry == geometries[i]
# Test case in which the ID is not found
returned_data_unit = DataUnit("something", geometries_table, target_column_name)
assert returned_data_unit.id == "something"
assert returned_data_unit.geometry is None
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment