Skip to content
Snippets Groups Projects

Add exposure import functions

Merged Danijel Schorlemmer requested to merge feature/import-exposure into main
@@ -22,8 +22,11 @@ import sys
import argparse
import configparser
import datetime
import glob
import csv
from databaselib.database import PostGISDatabase
from exposurelib.database import SpatiaLiteExposure, PostGISExposure
from exposurelib.utils import add_occupancy_to_taxonomy
# Add a logger printing error, warning, info and debug messages to the screen
@@ -155,6 +158,212 @@ class ExposureInitializer:
self.exposure_db.connection.commit()
tile_db.close()
@staticmethod
def add_asset_to_dict(asset_dict, taxonomy_id, asset):
"""
Adds an asset to an asset list by first searching for an asset with the same taxonomy in
the list. If found, the asset is added to the existing one, otherwise the list is
extended by the new asset.
Args:
asset_dict (dict of dicts):
Dictionary of assets.
taxonomy_id (int):
ID of the taxonomy (refers to `id` in table `Taxonomy`)
asset (dict):
Asset to be added to `asset_dict` under the key `taxonomy_id`.
Each asset contains of:
number : number of buildings or proportion of building
structural: structural costs of the asset
night : nighttime population of the asset
Returns:
The asset dictionary with the new asset added
"""
if asset_dict.get(taxonomy_id, None) is None:
asset_dict[taxonomy_id] = {
"number": asset["number"],
"structural": asset["structural"],
"night": asset["night"],
}
else:
asset_dict[taxonomy_id]["number"] += asset["number"]
asset_dict[taxonomy_id]["structural"] += asset["structural"]
asset_dict[taxonomy_id]["night"] += asset["night"]
return asset_dict
def process_district(self, boundary_id, country_iso_code, asset_dict):
"""
Processes all assets provided in `asset_list` for one district with `boundary_id`.
First, the asset list is reduced to unique taxonomy entries by summing up all key
values per taxonomy. Second, all tiles within the boundary with `boundary_id` are
identified and their proportion of the built area compared to the sum of the built area
of the selected tiles is computed. Third, for each tile with a built area a
reference entity is created and the reduced asset list is inserted as reference assets
with all values multiplied by the proportion factor, herby distributing all assets of
the exposure model proportionally on the built area to the tiles (entities).
Args:
boundary_id (str):
ID of the district boundary
country_iso_code (str):
ISO 3166-1 alpha-3 code of the country
asset_dict (dict):
Dictionary of asset values
"""
# Get all tiles within the boundary, estimate their built_area proportion and
# create the EntityReference and AssetReference datasets
logger.info("Query the built-area proportion per tile")
# Query explanation: Subquery `A` selects all tiles and is joined with subquery `B`
# selecting the boundary, resulting in subquery `T` delivering all tiles for which the
# centroid is located within the boundary. This subquery `T` is then used twice, first
# in subquery `SumQuery` to estimate the sum of the built-area and the cross-joined
# query retrieving the built-area size per tile to compute the built-area proportion.
if isinstance(self.exposure_db, SpatiaLiteExposure):
where_clause = "WHERE built_area_size IS NOT NULL"
else:
where_clause = "WHERE A.geometry && B.border AND built_area_size IS NOT NULL"
sql_statement = f"""
WITH T AS
(
SELECT quadkey, built_area_size
FROM
(
SELECT quadkey, built_area_size, country_iso_code,
{self.exposure_db.geometry_field}
FROM {self.exposure_db.tile_view}
) AS A
INNER JOIN
(
SELECT {self.exposure_db.geometry_field} AS border
FROM {self.exposure_db.boundary_table}
WHERE {self.exposure_db.boundary_id_field}= '{boundary_id}'
) AS B
ON ST_Contains(B.border, ST_Centroid(A.{self.exposure_db.geometry_field}))
{where_clause}
)
SELECT T.quadkey, T.built_area_size/SumQuery.total_sum as proportion
FROM T
CROSS JOIN
(
SELECT SUM(T.built_area_size) as total_sum FROM T
) AS SumQuery
"""
self.exposure_db.cursor.execute(sql_statement)
tiles = self.exposure_db.cursor.fetchall()
logger.info("Insert assets for each tile in boundary %s" % boundary_id)
for quadkey, proportion in tiles:
# Check if entity exists in EntityReference and create if necessary
entity_id = self.exposure_db.get_reference_entity_id(quadkey)
if entity_id is None:
entity_id = self.exposure_db.insert_reference_entity(quadkey, country_iso_code)
# Add the respective assets by proportion of built area to `AssetReference`
for taxonomy_id, value in asset_dict.items():
sql_statement = """
INSERT INTO AssetReference
(entity_id, taxonomy_id, number, structural, night)
VALUES (%d, %d, %f, %f, %f)
""" % (
entity_id,
int(taxonomy_id), # taxonomy_id
float(value["number"]) * proportion, # number
float(value["structural"]) * proportion, # structural
float(value["night"]) * proportion, # night
)
self.exposure_db.cursor.execute(sql_statement)
self.exposure_db.connection.commit()
def import_exposure(self, exposure_model_search_pattern, country_iso_code):
"""
Imports an OpenQuake-compatible aggregated exposure model into baseline entities and
baseline assets. The function reads in all exposure models matching the search
pattern. Each file is read line by line assuming that all assets of one location are
stored consecutively in the file. The function collects all assets per district and
passes them to the `process_district` function to be processed and stored in the
database. It also collects all assets of the entire country, normalizes them and stores
them as the country-average asset distribution in `AssetCountry`.
Args:
exposure_model_search_pattern (str):
Search pattern to identify all necessary exposure filepaths.
country_iso_code (str):
ISO 3166-1 alpha-3 code of the country.
"""
country_asset_dict = {}
# Iterate through all given exposure files
for exposure_filepath in glob.glob(exposure_model_search_pattern):
logger.info(f"Processing {exposure_filepath}")
csv_reader = csv.DictReader(open(exposure_filepath), delimiter=",")
# Sort the exposure file by boundary ID to have all assets of one district being
# listed consecutively to avoid listing same taxonomies multiple times
sorted_exposure = sorted(csv_reader, key=lambda line: line["BOUNDARY_ID"])
# Prepare the control variables
last_boundary_id = None
location_count = 0
asset_dict = {}
boundary_id = None
for row in sorted_exposure:
# Check if the line starts the asset list of a new location
if not (last_boundary_id == row["BOUNDARY_ID"]):
if location_count > 0:
pass
self.process_district(boundary_id, country_iso_code, asset_dict)
location_count += 1
boundary_id = row["BOUNDARY_ID"]
asset_dict = {} # Reset the location-based asset dictionary
last_boundary_id = row["BOUNDARY_ID"]
# Read in an asset
# Add the occupancy to the taxonomy string and store it in the database
taxonomy = add_occupancy_to_taxonomy(
row["TAXONOMY"], str(row["OCCUPANCY"]).upper()
)
if not self.exposure_db.taxonomy_string_exists(taxonomy):
self.exposure_db.insert_taxonomy(taxonomy)
taxonomy_id = self.exposure_db.get_taxonomy_id(taxonomy)
asset = {
"number": float(row["BUILDINGS"]),
"structural": float(row["COST_STRUCTURAL_EUR"]),
"night": float(row["OCCUPANTS_PER_ASSET_NIGHT"]),
}
# Store the asset in a location-based list and country-based list
asset_dict = self.add_asset_to_dict(asset_dict, taxonomy_id, asset)
country_asset_dict = self.add_asset_to_dict(
country_asset_dict, taxonomy_id, asset
)
self.process_district(boundary_id, country_iso_code, asset_dict)
logger.info("Assign the country-average assets")
# Normalize the country-average asset distribution
sum_number = 0
for taxonomy_id, value in country_asset_dict.items():
sum_number += value["number"]
# Assign the country-average asset distribution to the country assets
for taxonomy_id, value in country_asset_dict.items():
sql_statement = """
INSERT INTO AssetCountry
(country_iso_code, taxonomy_id, number, number_normalized, structural,
structural_normalized, night, night_normalized)
VALUES ('%s', %d, %f, %f, %f, %f, %f, %f)
""" % (
country_iso_code,
int(taxonomy_id),
float(value["number"]),
float(value["number"]) / sum_number,
float(value["structural"]),
float(value["structural"]) / sum_number,
float(value["night"]),
float(value["night"]) / sum_number,
)
self.exposure_db.cursor.execute(sql_statement)
self.exposure_db.connection.commit()
def command_line_interface():
"""
@@ -274,7 +483,7 @@ def command_line_interface():
else:
distributor.exposure_db.clean_up_reference(country_iso_code)
distributor.exposure_db.clean_up_asset_country(country_iso_code)
# distributor.import_exposure(exposure_model, country_iso_code)
distributor.import_exposure(exposure_model, country_iso_code)
# distributor.process_gaps_in_exposure(country_iso_code, country_boundary_id)
distributor.exposure_db.commit_and_close()
Loading