Select Git revision
exposureinitializer.py 26.18 KiB
#!/usr/bin/env python3
# Copyright (c) 2022:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import logging
import sys
import argparse
import configparser
import datetime
import glob
import csv
from databaselib.database import PostGISDatabase
from exposurelib.database import SpatiaLiteExposure, PostGISExposure
from taxonomylib import Taxonomy
# Add a logger printing error, warning, info and debug messages to the screen
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
class ExposureInitializer:
"""
This class imports OpenQuake-type exposure models into the baseline exposure per tile. It
uses as database either a SpatiaLite database for local storage of baseline exposure models
or a PostGIS database for server-based storage.
Args:
exposure_database_object (AbstractExposure):
Exposure database class as defined in the `exposure-lib`. Use either an instance of
`SpatiaLiteExposure` for storing data in a local SpatiaLite database or
`PostGISExposure` for storing data in a server-based PostGIS database.
exposure_search_pattern (str):
Search pattern to identify all necessary exposure filepaths.
postgis_config (Dictionary):
Dictionary with various configurations for database access:
Under the section `Tile` the following items need to be provided.
host (str):
Host name of the database server.
dbname (str):
Name of the PostGIS database.
port (str):
Port to access the database server.
username (str):
Name of the user to access the database.
password (str):
Password of the user to access the database.
"""
def __init__(
self,
exposure_database_object,
exposure_search_pattern,
postgis_config,
):
self.exposure_search_pattern = exposure_search_pattern
self.postgis_config = postgis_config
self.exposure_db = exposure_database_object
self.exposure_db.connect()
def retrieve_tile_set_and_boundaries(
self,
country_iso_code,
):
"""
Retrieves all boundaries of the given country and all tiles (zoom-level 18) within the
country boundary (defined as ISO-code + `-COUNTRY`). For each tile, the built area and
completeness values are retrieved too based on the `tile_best_estimate` view on the
tile database.
Args:
country_iso_code (str):
ISO 3166-1 alpha-3 code of the country
"""
logger.info("Retrieving tiles for country ISO code %s" % country_iso_code)
# Connect to tile database
tile_db_config = {
"host": self.postgis_config["Tile"]["host"],
"dbname": self.postgis_config["Tile"]["dbname"],
"port": self.postgis_config["Tile"]["port"],
"username": self.postgis_config["Tile"]["username"],
"password": self.postgis_config["Tile"]["password"],
}
tile_db = PostGISDatabase(**tile_db_config)
tile_db.connect()
# Retrieve all tiles within the respective country
sql_statement = f"""
SELECT quadkey, ST_AsText(geometry), built_area_size, built_up_ratio, completeness
FROM tile_best_estimate
WHERE country_iso_code = '{country_iso_code}'
"""
tile_db.cursor.execute(sql_statement)
# Inserting all retrieved tiles into the local SpatiaLite database
logger.info("Inserting tiles to local database")
num_tiles = 0
for (
quadkey,
geometry,
built_area_size,
built_up_ratio,
completeness,
) in tile_db.cursor:
sql_statement = f"""
INSERT INTO Tile
(quadkey, country_iso_code, built_area_size, built_up_ratio, completeness, geom)
VALUES (
'{quadkey}',
'{country_iso_code}',
{built_area_size if built_area_size else 'NULL'},
{built_up_ratio if built_up_ratio else 'NULL'},
{completeness if completeness else 'NULL'},
{"CastToMultipolygon(GeomFromText('%s', 4326))" % geometry}
)
"""
self.exposure_db.cursor.execute(sql_statement)
num_tiles += 1
if num_tiles % 10000 == 0:
logger.info("%d tiles inserted" % num_tiles)
logger.info("All tiles inserted")
# Retrieve all boundaries for which the ID starts with the country ISO code
logger.info("Retrieving boundaries")
sql_statement = f"""
SELECT boundary_id, ST_AsText(geometry) FROM Boundaries
WHERE boundary_id LIKE '{country_iso_code}%'
"""
tile_db.cursor.execute(sql_statement)
for (boundary_id, geometry_wkt) in tile_db.cursor:
sql_statement = f"""
INSERT INTO Boundary
(id, geom)
VALUES
(
'{boundary_id}',
{"CastToMultipolygon(GeomFromText('%s', 4326))" % geometry_wkt}
)
"""
self.exposure_db.cursor.execute(sql_statement)
logger.info("All boundaries inserted")
self.exposure_db.connection.commit()
tile_db.close()
@staticmethod
def add_asset_to_dict(asset_dict, taxonomy_id, asset):
"""
Adds an asset to an asset list by first searching for an asset with the same taxonomy in
the list. If found, the asset is added to the existing one, otherwise the list is
extended by the new asset.
Args:
asset_dict (dict of dicts):
Dictionary of assets.
taxonomy_id (int):
ID of the taxonomy (refers to `id` in table `Taxonomy`)
asset (dict):
Asset to be added to `asset_dict` under the key `taxonomy_id`.
Each asset contains of:
number : number of buildings or proportion of building
structural: structural costs of the asset
night : nighttime population of the asset
Returns:
The asset dictionary with the new asset added
"""
if asset_dict.get(taxonomy_id, None) is None:
asset_dict[taxonomy_id] = {
"number": asset["number"],
"structural": asset["structural"],
"night": asset["night"],
}
else:
asset_dict[taxonomy_id]["number"] += asset["number"]
asset_dict[taxonomy_id]["structural"] += asset["structural"]
asset_dict[taxonomy_id]["night"] += asset["night"]
return asset_dict
def process_district(self, boundary_id, country_iso_code, asset_dict):
"""
Processes all assets provided in `asset_list` for one district with `boundary_id`.
First, the asset list is reduced to unique taxonomy entries by summing up all key
values per taxonomy. Second, all tiles within the boundary with `boundary_id` are
identified and their proportion of the built area compared to the sum of the built area
of the selected tiles is computed. Third, for each tile with a built area a
reference entity is created and the reduced asset list is inserted as reference assets
with all values multiplied by the proportion factor, herby distributing all assets of
the exposure model proportionally on the built area to the tiles (entities).
Args:
boundary_id (str):
ID of the district boundary
country_iso_code (str):
ISO 3166-1 alpha-3 code of the country
asset_dict (dict):
Dictionary of asset values
"""
# Check if the boundary exists in the database and retrieve it
boundary_geometry = self.exposure_db.get_boundary_geometry(boundary_id)
if boundary_geometry is None:
raise ValueError(f"Boundary {boundary_id} does not exist in the database.")
# Define the basic part of the queries to estimate the built-area proportion of all
# tiles within a boundary to create the EntityReference and AssetReference datasets.
# Explanation: the query selects all tiles for which the centroid is located within the
# given boundary and the built-area size is not `NULL`. This query is used twice, first
# to calculate the sum of the built-area and then to retrieve the built-area size per
# tile (later turned into built-area proportion).
where_clause = f"""
WHERE ST_Contains(
ST_GeomFromText('{boundary_geometry}', 4326),
ST_Centroid({self.exposure_db.geometry_field})
)
AND built_area_size IS NOT NULL"""
if isinstance(self.exposure_db, PostGISExposure):
where_clause += f"""
AND {self.exposure_db.geometry_field}
&& ST_GeomFromText('{boundary_geometry}', 4326)
"""
basic_sql_statement = f"""
SELECT quadkey, built_area_size, {self.exposure_db.geometry_field}
FROM {self.exposure_db.tile_view}
{where_clause}
"""
# Calculate the total built area of the district
sql_statement = f"""
SELECT SUM(A.built_area_size) AS total_built_area
FROM ({basic_sql_statement}) AS A
"""
self.exposure_db.cursor.execute(sql_statement)
total_built_area = self.exposure_db.cursor.fetchone()[0]
if total_built_area is None:
logger.warning(f"District {boundary_id} contains no tiles with built area.")
return
# Retrieve all tiles with built area
sql_statement = f"""
SELECT A.quadkey, A.built_area_size
FROM ({basic_sql_statement}) AS A
"""
self.exposure_db.cursor.execute(sql_statement)
tiles = self.exposure_db.cursor.fetchall()
# Add reference entities to all tiles of the district
for quadkey, built_area_size in tiles:
# Check if entity exists in EntityReference and create if necessary
entity_id = self.exposure_db.get_reference_entity_id(quadkey)
if entity_id is None:
entity_id = self.exposure_db.insert_reference_entity(quadkey, country_iso_code)
# Add the respective assets by proportion of built area to `AssetReference`
proportion = built_area_size / total_built_area
reference_assets = [
[
entity_id,
taxonomy_id,
float(value["number"]) * proportion,
float(value["structural"]) * proportion,
float(value["night"]) * proportion,
]
for taxonomy_id, value in asset_dict.items()
]
self.exposure_db.insert_reference_assets(reference_assets)
self.exposure_db.connection.commit()
logger.info(
f"{boundary_id}: {len(tiles)} entities added, built-area size: {total_built_area}"
)
def import_exposure(self, exposure_model_search_pattern, country_iso_code):
"""
Imports an OpenQuake-compatible aggregated exposure model into baseline entities and
baseline assets. The function reads in all exposure models matching the search
pattern. Each file is read line by line assuming that all assets of one location are
stored consecutively in the file. The function collects all assets per district and
passes them to the `process_district` function to be processed and stored in the
database. It also collects all assets of the entire country, normalizes them and stores
them as the country-average asset distribution in `AssetCountry`.
Args:
exposure_model_search_pattern (str):
Search pattern to identify all necessary exposure filepaths.
country_iso_code (str):
ISO 3166-1 alpha-3 code of the country.
"""
country_asset_dict = {}
# Iterate through all given exposure files
for exposure_filepath in glob.glob(exposure_model_search_pattern):
logger.info(f"Processing {exposure_filepath}")
csv_reader = csv.DictReader(open(exposure_filepath), delimiter=",")
# Sort the exposure file by boundary ID to have all assets of one district being
# listed consecutively to avoid listing same taxonomies multiple times
sorted_exposure = sorted(csv_reader, key=lambda line: line["BOUNDARY_ID"])
# Prepare the control variables
last_boundary_id = None
location_count = 0
asset_dict = {}
boundary_id = None
for row in sorted_exposure:
# Check if the line starts the asset list of a new location
if not (last_boundary_id == row["BOUNDARY_ID"]):
if location_count > 0:
pass
self.process_district(boundary_id, country_iso_code, asset_dict)
location_count += 1
boundary_id = row["BOUNDARY_ID"]
asset_dict = {} # Reset the location-based asset dictionary
last_boundary_id = row["BOUNDARY_ID"]
# Read in an asset
# Create the expanded taxonomy string and add the occupancy to it
taxonomy = Taxonomy(row["TAXONOMY"])
taxonomy.set_section("occupancy", row["OCCUPANCY"].upper())
taxonomy_string = taxonomy.get_expanded_string()
# Store the expanded taxonomy string in the database
if not self.exposure_db.taxonomy_string_exists(taxonomy_string):
self.exposure_db.insert_taxonomy(taxonomy_string)
taxonomy_id = self.exposure_db.get_taxonomy_id(taxonomy_string)
asset = {
"number": float(row["BUILDINGS"]),
"structural": float(row["COST_STRUCTURAL_EUR"]),
"night": float(row["OCCUPANTS_PER_ASSET_NIGHT"]),
}
# Store the asset in a location-based list and country-based list
asset_dict = self.add_asset_to_dict(asset_dict, taxonomy_id, asset)
country_asset_dict = self.add_asset_to_dict(
country_asset_dict, taxonomy_id, asset
)
self.process_district(boundary_id, country_iso_code, asset_dict)
logger.info("Assign the country-average assets")
# Normalize the country-average asset distribution
sum_number = 0
for taxonomy_id, value in country_asset_dict.items():
sum_number += value["number"]
# Assign the country-average asset distribution to the country assets
for taxonomy_id, value in country_asset_dict.items():
sql_statement = f"""
INSERT INTO AssetCountry
(country_iso_code, taxonomy_id, number, number_normalized, structural,
structural_normalized, night, night_normalized)
VALUES ('{country_iso_code}',
{int(taxonomy_id)},
{float(value["number"])},
{float(value["number"]) / sum_number},
{float(value["structural"])},
{float(value["structural"]) / sum_number},
{float(value["night"])},
{float(value["night"]) / sum_number})
"""
self.exposure_db.cursor.execute(sql_statement)
self.exposure_db.connection.commit()
def process_gaps_in_exposure(self, country_iso_code):
"""
In the case of missing exposure data for districts in the country, tiles containing
built area will not be covered with assets. Also, tiles at the border with the centroid
located outside the border but inside a wider country border (e.g. coastal tiles) need
to be processed and filled with assets. This function detects such tiles and adds
reference entities for these tiles and uses the country-average asset distribution when
adding the reference assets in the proportion of the tile built area compared to the
total built area of the country.
If only parts of the exposure model covers a district, this function will not detect
this. Even if significant parts of the country are not included in the model,
Initializer will only distribute the number of buildings in the model over the entire
country.
Args:
country_iso_code (str):
ISO 3166-1 alpha-3 code of the country
"""
# Get the assets of the average building in the country
sql_statement = f"""
SELECT taxonomy_id, number, structural, night
FROM AssetCountry
WHERE country_iso_code = '{country_iso_code}'
"""
self.exposure_db.cursor.execute(sql_statement)
country_asset_list = self.exposure_db.cursor.fetchall()
# Calculate the total built area in the country
logger.info(f"Calculating the total built area in country {country_iso_code}.")
sql_statement = f"""
SELECT SUM(built_area_size) AS total_built_area
FROM {self.exposure_db.tile_view}
WHERE (country_iso_code = '{country_iso_code}')
AND (built_area_size IS NOT NULL)
"""
self.exposure_db.cursor.execute(sql_statement)
total_built_area = self.exposure_db.cursor.fetchone()[0]
logger.info(f"Total built area in country {country_iso_code}: {total_built_area}.")
# Query explanation: Subquery A selects all tiles located in the country for the given
# ISO code and the built area not being `NULL`. A is joined with the table
# `EntityReference` to identify the tiles without a reference entity and the joined
# query is used to retrieve the built-area size per selected tile.
logger.info("Retrieving all tiles with built area but without reference entities.")
sql_statement = f"""
SELECT A.quadkey, built_area_size
FROM
(
SELECT quadkey, built_area_size
FROM {self.exposure_db.tile_view}
WHERE (country_iso_code = '{country_iso_code}')
AND (built_area_size IS NOT NULL)
) AS A
LEFT JOIN EntityReference AS E
ON E.quadkey = A.quadkey
WHERE E.quadkey IS NULL
"""
self.exposure_db.cursor.execute(sql_statement)
tiles = self.exposure_db.cursor.fetchall()
logger.info(
f"{len(tiles)} tiles without reference entities found in {country_iso_code}."
)
if len(tiles) == 0:
return
for quadkey, built_area_size in tiles:
# Check if entity exists in EntityReference and create if necessary
entity_id = self.exposure_db.get_reference_entity_id(quadkey)
if entity_id is None:
entity_id = self.exposure_db.insert_reference_entity(quadkey, country_iso_code)
# Add the respective assets by proportion of built area to `AssetReference`
proportion = built_area_size / total_built_area
reference_assets = [
[
entity_id,
int(taxonomy_id),
float(number) * proportion,
float(structural) * proportion,
float(night) * proportion,
]
for taxonomy_id, number, structural, night in country_asset_list
]
self.exposure_db.insert_reference_assets(reference_assets)
self.exposure_db.connection.commit()
def create_view(self, country_iso_code):
"""
Create a view of number of buildings per reference entity (tile) to be used in QGIS.
The view is named as the `country_iso_code` in lower case.
Args:
country_iso_code (str):
ISO 3166-1 alpha-3 code of the country
"""
name = country_iso_code.lower() + "_number_buildings_per_entity"
sql_statement = """
CREATE VIEW IF NOT EXISTS %s AS
SELECT id as id_entity, EntityReference.geom as geometry,
SUM(AssetReference.number) as number_buildings
FROM EntityReference
INNER JOIN AssetReference ON EntityReference.id = AssetReference.entity_id
WHERE EntityReference.country_iso_code = '%s'
GROUP BY AssetReference.entity_id
""" % (
name,
country_iso_code,
)
self.exposure_db.connection.execute(sql_statement)
logger.info("View %s created" % name)
# Insert necessary entry to the `view_geometry_columns` table
sql_statement = (
"""
INSERT INTO views_geometry_columns
(view_name, view_geometry, view_rowid, f_table_name, f_geometry_column, read_only)
VALUES ('%s', 'geometry', 'id_entity', 'entityreference', 'geom', 1)
"""
% name
)
self.exposure_db.connection.execute(sql_statement)
self.exposure_db.connection.commit()
logger.debug("Geometry entry for view %s created" % name)
def command_line_interface():
"""
Command-line interface of the exposure-initializer. The first command-line option is the
command defining what type of database will be used:
`local` : Exposure will be generated in a local SpatiaLite database.
`server`: Exposure will be generated in a server-based PostGIS database.
The command require these mandatory options:
`-e`: Filepath of the exposure database (`local` only).
`-c`: Config file (INI-type) describing the access to the tile database. Also needed
for `local` as the boundaries and tiles will be copied over to the local
SpatiaLite database.
`-m`: File-search pattern for the OpenQuake exposure model files to be imported.
`-i`: ISO 3166-1 alpha-3 code of the country for which the exposure is to be imported.
"""
# Create the argument parser and prepare for subparsers
parser = argparse.ArgumentParser(description="Exposure-Initializer")
parser_shared = argparse.ArgumentParser(add_help=False)
parser_shared.add_argument(
"-c",
"--config-file",
required=True,
type=str,
help="Filepath of the config file for accessing the Tiles database",
)
parser_shared.add_argument(
"-m",
"--exposure-model",
required=True,
type=str,
help="Search pattern for exposure files",
)
parser_shared.add_argument(
"-i",
"--country-iso-code",
required=True,
type=str,
help="ISO 3166-1 alpha-3 code of the country",
)
# Prepare for subparsers
subparsers = parser.add_subparsers(help="sub-command help", dest="command")
# Create the parser for the 'local' command
parser_local = subparsers.add_parser(
"local", help="Run initializer on a local SpatiaLite database", parents=[parser_shared]
)
parser_local.add_argument(
"-e",
"--exposure-database",
required=True,
type=str,
help="Filepath to the SpatiaLite exposure database",
)
# Create the parser for the 'server' command
subparsers.add_parser(
"server",
help="Run initializer on a server-based PostGIS database",
parents=[parser_shared],
)
# Read arguments from command line
args = parser.parse_args()
command = args.command
exposure_model = args.exposure_model
country_iso_code = args.country_iso_code
postgis_config_file = args.config_file
# Create dictionary from INI-style config file
postgis_config = configparser.ConfigParser()
postgis_config.read(postgis_config_file)
# Initialize database
exposure_database_object = None
if command == "local":
exposure_database_object = SpatiaLiteExposure(args.exposure_database)
elif command == "server":
exposure_db_config = dict(postgis_config["Exposure"])
exposure_database_object = PostGISExposure(**exposure_db_config)
# Initialize timer
start_time = datetime.datetime.now()
logger.info("Processing start time: %s" % start_time)
if exposure_database_object is not None:
initializer = ExposureInitializer(
exposure_database_object, exposure_model, postgis_config
)
if command == "local":
initializer.exposure_db.create_tables()
initializer.retrieve_tile_set_and_boundaries(country_iso_code)
initializer.create_view(country_iso_code)
else:
initializer.exposure_db.remove_exposure(country_iso_code, delete_reference=True)
initializer.exposure_db.remove_asset_country(country_iso_code)
initializer.import_exposure(exposure_model, country_iso_code)
initializer.process_gaps_in_exposure(country_iso_code)
initializer.exposure_db.commit_and_close()
else:
logger.info("No database connection available")
logger.info("Execution time: %s" % (datetime.datetime.now() - start_time))
if __name__ == "__main__":
command_line_interface()