Commit ba721b39 authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Added feature to retrieve factors for disaggregation of costs

parent 38eec665
Pipeline #35725 passed with stage
in 2 minutes
......@@ -274,6 +274,14 @@ class ExposureModelESRM20(AggregatedExposureModel):
data_units_types_field (str):
Name of the field in sheet_name from which to retrieve information on the
types of data_units.
costs_disaggregation_header (str):
Name of the header associated with the occupancy case in the file that
contains information on the disaggregation of replacement costs.
population_time_distribution_fields (dict):
Dictionary specifying the names of the fields within the file
self.file_structure["population_time_distribution"]["filename"] associated
with the occupancy case and a specific time of the day. It contains the
subkeys "Day", "Night" and "Transit".
self.filename_pattern (str):
Pattern of the names of the ESRM20 CSV files.
self.boundary_filename_pattern (str):
......@@ -286,6 +294,28 @@ class ExposureModelESRM20(AggregatedExposureModel):
Relative path to the metadata .xlsx file.
CSVs (str):
Relative path to the folder that contains the CSV files per exposure entity.
population_time_distribution (dict):
Dictionary specifying the file and fields from which information on the
distribution of the census population present in each building at a specific
time of the day can be retrieved. It contains the following subkeys:
filename (str):
Relative path to the file containing these data.
sheet-name (str):
Name of the tab in the .xlsx file.
costs_disaggregation (dict):
Dictionary specifying the file structure and fields from which information
on the disaggregation of replacement costs into structural components,
non-structural components and contents can be retrieved. It contains the
following subkeys:
filename (str):
Relative path to the file containing these data.
sheet-name (str):
Name of the tab in the metadata .xlsx file where the information on
cost disaggregation can be found.
fields (dict):
Names of the fields containing the factors to disaggregate
replacement costs into structural components, non-structural
components and contents.
self.csv_column_names
Names of columns of interest in the CSV files.
self.currency (str):
......@@ -304,16 +334,34 @@ class ExposureModelESRM20(AggregatedExposureModel):
"short": "Res",
"sheet_name": "RES",
"data_units_types_field": "Admin level resolution/aggregation",
"costs_disaggregation_header": "RES",
"population_time_distribution_fields": {
"Day": "RES_DAY",
"Night": "RES_NIGHT",
"Transit": "RES_TRANSIT",
},
},
"commercial": {
"short": "Com",
"sheet_name": "COM",
"data_units_types_field": "Admin level resolution/aggregation",
"costs_disaggregation_header": "COM",
"population_time_distribution_fields": {
"Day": "NONRES_DAY",
"Night": "NONRES_NIGHT",
"Transit": "NONRES_TRANSIT",
},
},
"industrial": {
"short": "Ind",
"sheet_name": "IND",
"data_units_types_field": "Resolution",
"costs_disaggregation_header": "IND",
"population_time_distribution_fields": {
"Day": "NONRES_DAY",
"Night": "NONRES_NIGHT",
"Transit": "NONRES_TRANSIT",
},
},
}
self.filename_pattern = {
......@@ -331,21 +379,15 @@ class ExposureModelESRM20(AggregatedExposureModel):
"CSVs": "_exposure_models",
"population_time_distribution": {
"filename": "social_indicators/population_distribution_PAGER.xlsx",
"tab": "data",
"residential": {
"Day": "RES_DAY",
"Night": "RES_NIGHT",
"Transit": "RES_TRANSIT",
},
"commercial": {
"Day": "NONRES_DAY",
"Night": "NONRES_NIGHT",
"Transit": "NONRES_TRANSIT",
},
"industrial": {
"Day": "NONRES_DAY",
"Night": "NONRES_NIGHT",
"Transit": "NONRES_TRANSIT",
"sheet_name": "data",
},
"costs_disaggregation": {
"filename": "sources/European_Exposure_Model_Data_Inputs_Sources.xlsx",
"sheet_name": "Notes",
"fields": {
"Structural": "Replacement cost: structural / total",
"Non-Structural": "Replacement cost: non-structural / total",
"Contents": "Replacement cost: contents / total",
},
},
}
......@@ -375,7 +417,7 @@ class ExposureModelESRM20(AggregatedExposureModel):
"census_people_per_building": "OCCUPANTS_PER_BUILDING",
"total_cost_per_building": "TOTAL_REPL_COST_PER_BUILDING",
}
self.currency = "EUR"
self.currency = "EUR 2020"
self.exposure_entities = self.retrieve_exposure_entities(configuration)
def retrieve_exposure_entities(self, configuration):
......@@ -418,10 +460,13 @@ class ExposureModelESRM20(AggregatedExposureModel):
configuration.data_pathname,
self.file_structure["population_time_distribution"]["filename"],
),
sheet_name=self.file_structure["population_time_distribution"]["tab"],
sheet_name=self.file_structure["population_time_distribution"]["sheet_name"],
index_col=0, # Use first column as index
)
# Read metadata on the disaggregation of costs and retrieve factors
costs_disaggregation_all = self._retrieve_costs_disaggregation(configuration)
# Needs to go by occupancy case because the names and properties of the exposure
# entities can only be read from the metadata file for a sheet that is associated
# with a particular occupancy case:
......@@ -478,7 +523,7 @@ class ExposureModelESRM20(AggregatedExposureModel):
for exposure_entity in read_names:
if exposure_entity not in exposure_entities.keys():
exposure_entities[exposure_entity] = ExposureEntity(
exposure_entity, configuration.exposure_entities_code
exposure_entity, configuration.exposure_entities_code, self.currency
)
output = self._map_data_units_types(data_units_types_row.loc[exposure_entity])
......@@ -490,16 +535,22 @@ class ExposureModelESRM20(AggregatedExposureModel):
"population_time_distribution": {},
}
for time in self.file_structure["population_time_distribution"][case].keys():
for time in self.occupancy_cases[case][
"population_time_distribution_fields"
].keys():
output["population_time_distribution"][time] = round(
population_time.loc[
exposure_entity,
self.file_structure["population_time_distribution"][case][time],
self.occupancy_cases[case]["population_time_distribution_fields"][
time
],
]
/ 100.0,
7,
)
output["costs_disaggregation"] = costs_disaggregation_all[case]
# Write the contents occupancy_cases to the ExposureEntity object
exposure_entities[exposure_entity].occupancy_cases[case] = output
......@@ -601,7 +652,6 @@ class ExposureModelESRM20(AggregatedExposureModel):
total_dwellings[data_unit_id],
total_people[data_unit_id],
total_cost[data_unit_id],
self.currency,
building_classes_proportions_and_properties[data_unit_id],
)
......@@ -1048,3 +1098,79 @@ class ExposureModelESRM20(AggregatedExposureModel):
proportions_and_properties[data_unit_id] = pandas.DataFrame(unique_vals)
return proportions_and_properties
def _retrieve_costs_disaggregation(self, configuration):
"""This function retrieves the factors through which the total replacement cost of a
building can be disaggregated into that of structural components, non-structural
components and contents.
Args:
configuration (Configuration object):
Instance of the Configuration class, with at least the following attribute:
data_pathname (str):
Path to the directory that contains the input aggregated exposure model
data.
Returns:
costs_disaggregation_all (dict):
Dictionary with one key per occupancy case of self.occupancy_cases. Each key
contains the following sub-keys:
Structural (float):
Factor to obtain the cost of the structural components.
Non-Structural (float):
Factor to obtain the cost of the non- structural components.
Contents (float):
Factor to obtain the cost of the building contents.
"""
# Read the metadata file
costs_disaggregation = pandas.read_excel(
os.path.join(
configuration.data_pathname,
self.file_structure["costs_disaggregation"]["filename"],
),
sheet_name=self.file_structure["costs_disaggregation"]["sheet_name"],
header=None,
skiprows=6,
)
costs_disaggregation_all = {}
for case in self.occupancy_cases:
costs_disaggregation_all[case] = {}
# Identify the row of costs_disaggregation where case starts
case_header = self.occupancy_cases[case]["costs_disaggregation_header"]
which_case_header = numpy.where(costs_disaggregation[0].to_numpy() == case_header)[
0
][0]
factor_sum = 0.0
for cost_type in self.file_structure["costs_disaggregation"]["fields"]:
# Identify the row of costs_disaggregation where this cost_type is for this case
cost_field = self.file_structure["costs_disaggregation"]["fields"][cost_type]
which_cost_field = numpy.where(
costs_disaggregation[0].to_numpy() == cost_field
)[0]
which_after_case_header = numpy.where(which_cost_field > which_case_header)[0]
distance_which = abs(
which_cost_field[which_after_case_header] - which_case_header
)
which_field = which_cost_field[which_after_case_header][
distance_which.argsort()[0]
]
# Retrieve factor
costs_disaggregation_all[case][cost_type] = costs_disaggregation[1].to_numpy()[
which_field
]
factor_sum += costs_disaggregation_all[case][cost_type]
if abs(factor_sum - 1.0) > 1e-5:
raise ValueError(
"The summation of factors for the disaggregation of costs into structural "
"components, non-structural components and contents must be equal to 1. "
"Current sum for %s occupancy is: %s." % (case, "{:.6f}".format(factor_sum))
)
return costs_disaggregation_all
......@@ -85,8 +85,6 @@ class DataUnit:
Replacement cost of the non-structural components of all buildings.
Contents (float):
Replacement cost of the contents of all buildings.
self.currency (str):
Currency used in self.total_cost to express building replacement costs.
self.building_classes_proportions_and_properties (Pandas DataFrame):
Pandas DataFrame with the following columns:
building_class_name (str):
......@@ -139,7 +137,6 @@ class DataUnit:
total_dwellings=None,
total_people=_TOTAL_PEOPLE_DEFAULT,
total_cost=_TOTAL_COST_DEFAULT,
currency=None,
building_classes_proportions_and_properties=None,
):
"""
......@@ -168,8 +165,6 @@ class DataUnit:
Dictionary with the monetary value of all the buildings in the DataUnit (cost of
total_buildings). The keys should be: Total, Structural, Non-Structural,
Contents. See definitions in the description of DataUnit.total_cost.
currency (str):
Currency used in total_cost to express building replacement costs.
building_classes_proportions_and_properties (Pandas DataFrame):
Pandas DataFrame with the following columns:
building_class_name (str):
......@@ -212,7 +207,6 @@ class DataUnit:
self.total_dwellings = total_dwellings
self.total_people = total_people
self.total_cost = total_cost
self.currency = currency
self.building_classes_proportions_and_properties = (
building_classes_proportions_and_properties
)
......
......@@ -36,6 +36,8 @@ class ExposureEntity:
self.code (str):
3-character code that uniquely identifies this exposure entity. If the exposure
entity is a country, this is the ISO3 code for the country.
self.currency (str):
Currency used to express building replacement costs.
self.occupancy_cases (dict):
Dictionary definining the type, level and definition of the data units used for each
occupancy case of the model (e.g. residential, commercial), with the following
......@@ -91,22 +93,38 @@ class ExposureEntity:
| | Factor to obtain the population expected to be inside the
| | buildings during transit times (approx. 6 am to 10 am and
| | 6 pm to 10 pm).
| |_ costs_disaggregation (dict):
| | Dictionary containing factors by which the total cost per
| | building can be multiplied to obtain the replacement costs of
| | contents, structural and non-structural components. It contains
| | the following keys:
| | Structural (float):
| | Factor to obtain the cost of the structural components.
| | Non-Structural (float):
| | Factor to obtain the cost of the non- structural
| | components.
| | Contents (float):
| | Factor to obtain the cost of the building contents.
| | The sum of these three factors needs to be equal to 1.
|_ occupancy_cases.keys()[1]
| |_ data_units_type: ...
|_ ...
"""
def __init__(self, name, config_code):
def __init__(self, name, config_code, currency):
"""
Args:
name (str):
Name of the exposure entity.
config_code (str or dict):
Either "ISO3" (str) or a dictionary, of which name needs to be a key.
currency (str):
Currency used to express building replacement costs.
"""
self.name = name
self.code = self._interpret_exposure_entities_code(config_code)
self.currency = currency
self.occupancy_cases = {}
def create_data_unit_tiles(
......
......@@ -79,6 +79,18 @@ def test_ExposureModelESRM20(test_db):
== "unknown"
)
# Test case in which the sum of cost disaggregation factors is not 1
config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_good.yml")
)
config.data_pathname = os.path.join(
os.path.dirname(__file__), "data", "ExposureModelESRM20_sum_costs_error"
)
with pytest.raises(ValueError) as excinfo:
returned_aem = ExposureModelESRM20(config)
assert "ValueError" in str(excinfo.type)
# Test the non-trivial case using a test metadata file
config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_good.yml")
......@@ -160,10 +172,17 @@ def test_ExposureModelESRM20(test_db):
index_col=0, # Use first column as index
)
expected_costs_disaggregation = {
"residential": {"Structural": 0.3, "Non-Structural": 0.5, "Contents": 0.2},
"commercial": {"Structural": 0.2, "Non-Structural": 0.3, "Contents": 0.5},
"industrial": {"Structural": 0.15, "Non-Structural": 0.25, "Contents": 0.6},
}
# Compare
for i, exposure_entity in enumerate(returned_aem.exposure_entities.keys()):
assert returned_aem.exposure_entities[exposure_entity].name == expected_names[i]
assert len(returned_aem.exposure_entities[exposure_entity].occupancy_cases.keys()) == 3
assert returned_aem.exposure_entities[exposure_entity].currency == "EUR 2020"
for case in expected_occupancies:
assert case in list(
returned_aem.exposure_entities[exposure_entity].occupancy_cases.keys()
......@@ -187,6 +206,17 @@ def test_ExposureModelESRM20(test_db):
expected_population_time_distribution.loc[exposure_entity, column], 7
)
for cost_type in expected_costs_disaggregation[case].keys():
assert (
round(
returned_aem.exposure_entities[exposure_entity].occupancy_cases[case][
"costs_disaggregation"
][cost_type],
2,
)
== round(expected_costs_disaggregation[case][cost_type], 2)
)
# Go on to test the retrieval of the data units
exposure_entities_to_run = ["Entity_1", "Entity_2"]
occupancies_to_run = ["residential", "commercial"]
......
......@@ -41,7 +41,6 @@ def test_DataUnit(test_db):
geometry = geopandas.GeoSeries(geometries)
total_buildings = [1500.0, 740.6]
total_dwellings = [3000.0, 1851.5]
currency = "EUR"
aux_data = {target_column_name: unit_ids, "geometry": geometry}
geometries_table = geopandas.GeoDataFrame(aux_data)
geometries_table.crs = CRS("epsg:4326")
......@@ -53,13 +52,11 @@ def test_DataUnit(test_db):
target_column_name,
total_buildings=total_buildings[i],
total_dwellings=total_dwellings[i],
currency=currency,
)
assert returned_data_unit.id == unit_ids[i]
assert returned_data_unit.geometry == geometries[i]
assert returned_data_unit.total_buildings == total_buildings[i]
assert returned_data_unit.total_dwellings == total_dwellings[i]
assert returned_data_unit.currency == currency
# Test case in which the ID is not found
returned_data_unit = DataUnit("something", geometries_table, target_column_name)
......@@ -130,7 +127,6 @@ def test_DataUnit(test_db):
total_dwellings=total_dwellings[i],
total_people=total_people,
total_cost=total_cost,
currency=currency,
)
returned_data_unit.write_data_unit_to_database(
......
......@@ -30,14 +30,16 @@ def test_ExposureEntity_retrieve_country_ISO3():
def test_ExposureEntity():
returned_exposureentity = ExposureEntity("Greece", "ISO3")
returned_exposureentity = ExposureEntity("Greece", "ISO3", "EUR 2020")
assert returned_exposureentity.name == "Greece"
assert returned_exposureentity.code == "GRC"
assert returned_exposureentity.currency == "EUR 2020"
returned_exposureentity = ExposureEntity(
"South_America", {"South_America": "SRR", "Africa": "FFF"}
"South_America", {"South_America": "SRR", "Africa": "FFF"}, "USD 2015"
)
assert returned_exposureentity.name == "South_America"
assert returned_exposureentity.code == "SRR"
assert returned_exposureentity.currency == "USD 2015"
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment