Commit d98da817 authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Added feature to store assumptions on costs

parent ba721b39
Pipeline #35789 passed with stage
in 2 minutes and 32 seconds
......@@ -21,6 +21,7 @@ import iso3166
from multiprocessing import Pool
from functools import partial
from gdeimporter.tools.data_unit_tiles import DataUnitTilesHelper
from gdeimporter.tools.database import Database
logger = logging.getLogger()
......@@ -265,6 +266,136 @@ class ExposureEntity:
return
def write_costs_assumptions_to_database(
self,
db_data_units_config,
db_table,
aggregated_source_id,
occupancy_case,
):
"""This function writes the factors that can be used to disaggregate total replacement
costs of occupancy_case within the ExposureEntity into structural components,
non-structural components and contents, as well as the currency used to describe costs,
to the table with name db_table in the database whose credentials are indicated in
db_data_units_config. If an entry already exists in the database for this
ExposureEntity, occupancy_case and aggregated_source_id, the attributes are updated. If
an entry does not exist beforehand, it is created.
Args:
db_data_units_config (dict):
Dictionary containing the credentials needed to connect to the SQL database in
which information on the cost assumptions is stored. The keys of the dictionary
need to be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
db_table (str):
Name of the table of the SQL database where the cost assumptions will be stored.
It is assumed that this table contains, at least, the following fields:
exposure_entity (str):
3-character code of the exposure entity.
occupancy_case (enum):
SQL enumerated type describing the building occupancy cases.
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
structural (float):
Factor to obtain the cost of the structural components.
non_structural (float):
Factor to obtain the cost of the non-structural components.
contents (float):
Factor to obtain the cost of the building contents.
currency (str):
Currency used to specify replacement costs.
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
occupancy_case (str):
Name of the occupancy case (e.g. "residential", "commercial", "industrial")
associated with this data_unit.
Returns:
This function writes to the table with name db_table in the database whose
credentials are indicated in db_data_units_config.
"""
sql_commands = {}
sql_commands["query"] = "SELECT COUNT(*) FROM %s"
sql_commands["query"] += " WHERE (exposure_entity='%s' AND occupancy_case='%s'"
sql_commands["query"] += " AND aggregated_source_id='%s');"
sql_commands["update"] = "UPDATE %s"
sql_commands["update"] += " SET (structural, non_structural, contents, currency)"
sql_commands["update"] += " = ('%s','%s','%s','%s')"
sql_commands["update"] += " WHERE (exposure_entity='%s' AND occupancy_case='%s'"
sql_commands["update"] += " AND aggregated_source_id='%s');"
sql_commands["insert"] = "INSERT INTO"
sql_commands["insert"] += " %s(exposure_entity, occupancy_case, aggregated_source_id,"
sql_commands["insert"] += " structural, non_structural, contents, currency)"
sql_commands["insert"] += " VALUES('%s','%s','%s','%s','%s','%s','%s');"
costs_disaggregation = self.occupancy_cases[occupancy_case]["costs_disaggregation"]
db_gde_tiles = Database(**db_data_units_config)
db_gde_tiles.create_connection_and_cursor()
# Check if an entry already exists for this exposure entity, occupancy case and
# aggregated source ID
db_gde_tiles.cursor.execute(
sql_commands["query"]
% (
db_table,
self.code,
occupancy_case,
str(aggregated_source_id),
)
)
exec_result = db_gde_tiles.cursor.fetchall()
if exec_result[0][0] > 0: # Entry exists --> update
db_gde_tiles.cursor.execute(
sql_commands["update"]
% (
db_table,
costs_disaggregation["Structural"],
costs_disaggregation["Non-Structural"],
costs_disaggregation["Contents"],
self.currency,
self.code,
occupancy_case,
str(aggregated_source_id),
)
)
elif exec_result[0][0] == 0: # No entry for this combination exists --> append
db_gde_tiles.cursor.execute(
sql_commands["insert"]
% (
db_table,
self.code,
occupancy_case,
str(aggregated_source_id),
costs_disaggregation["Structural"],
costs_disaggregation["Non-Structural"],
costs_disaggregation["Contents"],
self.currency,
)
)
else: # More than one entries found, this is an error
logger.error(
"ERROR IN write_costs_assumptions_to_database: MORE THAN ONE ENTRY FOUND FOR"
" exposure_entity='%s' AND occupancy_case='%s' AND aggregated_source_id='%s'"
% (self.code, occupancy_case, aggregated_source_id)
)
db_gde_tiles.close_connection()
return
def _interpret_exposure_entities_code(self, config_code):
"""This function interprets the value of exposure_entities_code given as configuration.
......
......@@ -72,6 +72,12 @@ def main():
occupancy_case,
aem_source_id,
)
aem.exposure_entities[exposure_entity_name].write_costs_assumptions_to_database(
config.database_gde_tiles,
"exposure_entities_costs_assumptions",
aem_source_id,
occupancy_case,
)
aem.exposure_entities[exposure_entity_name].create_data_unit_tiles(
occupancy_case,
config.number_cores,
......
......@@ -31,7 +31,8 @@ def test_db():
- obm_built_area_assessments (of the OBM Tiles database)
- aggregated_sources (of the GDE Tiles database)
- data_units (of the GDE Tiles database)
- data_unit_tiles (of the GDE Tiles database).
- data_unit_tiles (of the GDE Tiles database)
- exposure_entities_costs_assumptions (of the GDE Tiles database)
"""
init_test_db()
......@@ -40,7 +41,8 @@ def test_db():
def init_test_db():
"""Populates the test database that simulates to contain obm_built_area_assessments,
aggregated_sources, data_units and data_unit_tiles with a basic schema and data.
aggregated_sources, data_units, data_unit_tiles and exposure_entities_costs_assumptions with
a basic schema and data.
"""
if "GDEIMPORTER_DB_HOST" in os.environ: # When running the CI pipeline
......
......@@ -2,6 +2,7 @@ DROP TABLE IF EXISTS obm_built_area_assessments;
DROP TABLE IF EXISTS aggregated_sources;
DROP TABLE IF EXISTS data_units;
DROP TABLE IF EXISTS data_unit_tiles;
DROP TABLE IF EXISTS exposure_entities_costs_assumptions;
DROP TYPE IF EXISTS occupancycase;
CREATE TYPE occupancycase AS ENUM ('residential', 'commercial', 'industrial');
......@@ -86,3 +87,24 @@ INSERT INTO data_units(data_unit_id,
people_census,
cost_total)
VALUES ('ABC_123456','residential',17,'ABC',0.0,0.0,0.0,0.0);
CREATE TABLE exposure_entities_costs_assumptions
(
aggregated_source_id SMALLINT,
exposure_entity CHAR(3),
occupancy_case occupancycase,
structural FLOAT,
non_structural FLOAT,
contents FLOAT,
currency VARCHAR,
PRIMARY KEY (exposure_entity, occupancy_case, aggregated_source_id)
);
INSERT INTO exposure_entities_costs_assumptions(exposure_entity,
occupancy_case,
aggregated_source_id,
structural,
non_structural,
contents,
currency)
VALUES ('GRC','commercial',19,0.0,0.0,0.0,'USD');
......@@ -16,8 +16,11 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import os
import logging
from gdeimporter.configuration import Configuration
from gdeimporter.exposureentity import ExposureEntity
from gdeimporter.tools.database import Database
logger = logging.getLogger()
......@@ -43,3 +46,117 @@ def test_ExposureEntity():
assert returned_exposureentity.name == "South_America"
assert returned_exposureentity.code == "SRR"
assert returned_exposureentity.currency == "USD 2015"
def test_ExposureEntity_write_costs_assumptions_to_database(test_db):
# Create an exposure entity
returned_exposureentity = ExposureEntity("Greece", "ISO3", "EUR 2020")
returned_exposureentity.occupancy_cases = {
"residential": {
"costs_disaggregation": {"Structural": 0.3, "Non-Structural": 0.5, "Contents": 0.2},
},
"commercial": {
"costs_disaggregation": {"Structural": 0.2, "Non-Structural": 0.3, "Contents": 0.5},
},
"industrial": {
"costs_disaggregation": {
"Structural": 0.15,
"Non-Structural": 0.25,
"Contents": 0.6,
},
},
}
# Load configuration file to retrieve database credentials
config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_good.yml")
)
for occupancy_case in returned_exposureentity.occupancy_cases:
returned_exposureentity.write_costs_assumptions_to_database(
config.database_gde_tiles,
"exposure_entities_costs_assumptions",
19,
occupancy_case,
)
# Test that cost assumptions have been stored correctly
for occupancy_case in returned_exposureentity.occupancy_cases:
query_result = query_costs_assumptions(
config.database_gde_tiles, returned_exposureentity.code, occupancy_case, 19
)
assert len(query_result) == 1 # one entry found in the database
assert round(query_result[0][0], 5) == round(
returned_exposureentity.occupancy_cases[occupancy_case]["costs_disaggregation"][
"Structural"
],
5,
)
assert round(query_result[0][1], 5) == round(
returned_exposureentity.occupancy_cases[occupancy_case]["costs_disaggregation"][
"Non-Structural"
],
5,
)
assert round(query_result[0][2], 5) == round(
returned_exposureentity.occupancy_cases[occupancy_case]["costs_disaggregation"][
"Contents"
],
5,
)
assert query_result[0][3] == "EUR 2020" # currency
def query_costs_assumptions(credentials, exposure_entity, occupancy_case, aggregated_source_id):
"""This auxiliary function queries the 'exposure_entities_costs_assumptions' table of the
test database to find all entries corresponding to 'exposure_entity', 'occupancy_case' and
'aggregated_source_id'.
Args:
credentials (dict):
Dictionary containing the credentials needed to connect to the test SQL database.
The keys of the dictionary need to be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
exposure_entity (str):
3-character string identifying an exposure entity.
occupancy_case (str):
Occupancy case.
aggregated_source_id (int):
ID of the aggregated exposure model source associated with exposure_entity.
Returns:
result (list of tuples):
Each tuple of the list corresponds to one entry in the 'data_units' table of the
test database that matches the search criteria. The elements within the tuple are:
structural (float):
Factor to obtain the cost of the structural components.
non_structural (float):
Factor to obtain the cost of the non-structural components.
contents (float):
Factor to obtain the cost of the building contents.
currency (str):
Currency used to specify replacement costs.
"""
sql_command = (
"SELECT structural, non_structural, contents, currency"
" FROM exposure_entities_costs_assumptions"
" WHERE (exposure_entity='%s' AND occupancy_case='%s' AND aggregated_source_id='%s');"
% (exposure_entity, occupancy_case, aggregated_source_id)
)
db_test = Database(**credentials)
db_test.create_connection_and_cursor()
db_test.cursor.execute(sql_command)
result = db_test.cursor.fetchall()
return result
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment