Commit 36396e58 authored by Danijel Schorlemmer's avatar Danijel Schorlemmer Committed by Tara Evaz Zadeh
Browse files

Added the FragilityModel class to handle XML-formatted fragility functions

parent 016ff55b
Pipeline #29896 passed with stage
in 3 minutes and 32 seconds
import csv
import numpy as np
import xml.etree.ElementTree as ET
class FragilityModel:
"""
General class to handle a complete fragility model as a set of fragility functions
for different structural types
"""
def __init__(self, fragility_functions_filepath, intensity_measure_map_filepath):
"""
Initializing the fragility model by importing the XML file with fragility functions
and creating the intensity-measure map that maps intensity types from the
ground-motion field to intensity types as given in the fragility functions.
This function currently only supports `discrete` fragility functions.
Another limitation is that for a building taxonomy only one fragility function
can be used.
Args:
fragility_functions_filepath (str):
File path to the XML file containing fragility functions in NRML as
provided by the Global Earthquake Model.
intensity_measure_map_filepath (str):
File path to a CSV file mapping the intensity types from the
ground-motion field to intensity types as given in the fragility functions.
If `None`, a default mapping will be applied.
"""
tree = ET.parse(fragility_functions_filepath)
root = tree.getroot()[0]
assert "fragilityModel" in root.tag, (
"%s does not contain a recognized fragility model" % fragility_functions_filepath
)
self.asset_category = root.attrib["assetCategory"]
self.loss_category = root.attrib["lossCategory"]
self.fragility_functions = {}
self.gm_type_column_map = {}
self.limit_states = None
self.num_limit_states = 0
for child in root:
if "description" in child.tag:
self.description = child.text
elif "limitStates" in child.tag:
self.limit_states = child.text.split()
self.num_limit_states = len(self.limit_states)
elif "fragilityFunction" in child.tag:
limit_states = []
# Intensity-measure values and probabilities of exceedance
imv_poe = {}
# Intensity-measure type
im_type = None
for ichild in list(child):
key, value = ichild.items()[0]
if key == "imt":
im_type = value
im_value = np.array(list(map(float, ichild.text.split())))
imv_poe["imv"] = im_value
if key == "ls":
ls = value
values = np.array(list(map(float, ichild.text.split())))
imv_poe[ls] = values
limit_states.append(value)
imv_poe["format"] = child.get("format")
imv_poe["imt"] = im_type
if im_type is None:
raise ValueError(
"No intensity measure given for fragility function %s"
% (child.get("id"))
)
if not (limit_states == self.limit_states):
raise ValueError(
"Limit states for fragility function"
+ "%s differ from global limit states %s"
% (child.get("id"), self.limit_states)
)
self.fragility_functions[child.get("id")] = imv_poe
# Read the intensity measure map into a dictionary
if intensity_measure_map_filepath is not None:
with open(intensity_measure_map_filepath, mode="r") as input_file:
reader = csv.reader(input_file)
next(reader)
self.intensity_measure_map = {rows[0]: rows[1] for rows in reader}
else:
self.intensity_measure_map = {
"gmv_SA(0.3)": "SA(0.3)",
"gmv_SA(0.6)": "SA(0.6)",
"gmv_SA(1.0)": "SA(1)",
"gmv_PGA": "PGA",
}
def __repr__(self):
# Informative string representation
return "Fragility Model: %s\nLimit States: %s" % (
self.description,
" ".join(self.limit_states),
)
def __len__(self):
# Return the number of fragility functions in the model
return len(self.fragility_functions)
def get_imt(self, taxonomy):
return self.fragility_functions[taxonomy]
def create_gmf_column_map(self, gmf_filepath):
"""
Creates a mapping dictionary between the intensity measure types and their respective
column number in the ground-motion field file.
Mapping dictionary example:
{'PGA': 2, 'SA(0.3)': 3, 'SA(0.6)': 4, 'SA(1)': 5}
Args:
gmf_filepath (str):
File path to the ground-motion field
(Header line contains the intensity-measure names)
File example extract:
lon,lat,gmv_PGA,gmv_SA(0.3),gmv_SA(0.6),gmv_SA(1.0)
22.89028,36.31806,4.822530E-03,1.255729E-02,8.736364E-03,5.612176E-03
22.89028,36.32083,4.830462E-03,1.257595E-02,8.748890E-03,5.619673E-03
...
"""
# Extracting the ground-motion types available in the ground-motion field.
with open(gmf_filepath) as gmf:
im_types = gmf.readline().strip().split(",")[2:]
# Add for each intensity measure from the ground-motion field the respective
# column number to the dictionary. Hereby, using the intensity-measure map to
# translate between the name in the ground-motion field and the name in
# the fragility functions.
for i in range(len(im_types)):
self.gm_type_column_map[self.intensity_measure_map[im_types[i]]] = i + 2
def __call__(self, taxonomy, intensity_measure_values):
"""
Computing PoE values for all the different damage states available in the
fragility functions, considering the ground-motion value.
Args:
taxonomy (str):
Taxonomy string of an asset
Example:
>>> taxonomy
'CR/LDUAL+CDM/HBET:6-/11.0'
intensity_measure_values (list):
Full row of the ground-motion field for one asset represented as a list
containing the lognitude, latitude, and all avaialble intensity measure
values.
Example:
>>> intensity_measure_values
[22.89028,36.31806,4.822530E-03,1.255729E-02,8.736364E-03,5.612176E-03]
Returns:
poes (list):
A list with element as probabilities of exceedance.
Example:
>>> poes
[0.73, 0.52, 0.32, 0.18]
poos (list):
A list with element as probabilities of occurence.
Example:
>>> poos
[0.27, 0.21, 0.2 , 0.14, 0.18]
im_value (float):
Intensity-measure value used to compute the PoEs.
im_type (str):
Intensity-measure type used to compute the PoEs.
"""
# Retrieve the fragility function matching the taxonomy
ff = self.fragility_functions[taxonomy]
if ff["format"] == "discrete":
# Process a discrete fragility function
im_type = ff["imt"]
column = self.gm_type_column_map.get(im_type, None)
if column is None:
raise ValueError(
"Necessary intensity measure type %s not given in ground-motion file."
% (im_type)
)
im_value = intensity_measure_values[column]
if im_value <= ff["imv"][0]:
# The intensity-measure value is smaller than or equal to the minimum
# intensity-measure value of the fragility function
poes = [1.0]
poes.extend([0.0] * self.num_limit_states)
poos = list(poes)
elif im_value >= ff["imv"][-1]:
# The intensity-measure value is larger than or equal to the maximum
# intensity-measure value of the fragility function
poes = [1.0]
for ls in self.limit_states:
poes.append(ff[ls][-1])
poos = list(-1 * np.diff(poes))
poos.append(poes[-1])
else:
# The intensity-measure value is in the range of defined
# intensity-measure values of the fragility function
# Determining iml that is smaller than the given ground-motion value
iml_min = ff["imv"][ff["imv"] < im_value].max()
# Finding index of the iml_min
index_iml_min = np.searchsorted(ff["imv"], iml_min, side="left")
# Smallest iml that is larger than the given ground-motion value
iml_max = ff["imv"][ff["imv"] > im_value].min()
# Finding index of the iml_max
index_iml_max = np.searchsorted(ff["imv"], iml_max, side="left")
# Two bounds of the interpolation
x = [iml_min, iml_max]
# Prepare return value
poes = [1.0]
# Looping over all limit states
for ls in self.limit_states:
y = [ff[ls][index_iml_min], ff[ls][index_iml_max]]
# Interpolating PoE values for the given gm-value between x and y
# range (using interpolation)
poes.append(np.interp(im_value, x, y))
# Calculating PoOs
# pylint: disable=E1130
poos = list(-1 * np.diff(poes))
poos.append(poes[-1])
return poes[1:5], poos, im_value, im_type
......@@ -19,111 +19,80 @@
import os
import argparse
import csv
import numpy as np
import pandas as pd
import losslib
import datetime
import FragilityModel
def damage_calculator(
exposure_filepath,
fragility_pathname,
taxonomy_conversion_filepath,
fragility_filepath,
intensity_measure_map,
gm_field_filepath,
interpolation_method="linear",
result_filepath="damage_result.csv",
):
"""
This function computes the probabilities of occurrence (PoE) of damage states for a scenario
earthquakes, given a ground-motion field, an exposure model representing the assets in the
This function computes the probabilities of occurrence (PoO) of damage states for a scenario
earthquake, given a ground-motion field, an exposure model representing the assets in the
region of interest and fragility functions that model the probability of exceeding defined
damage states given a set of ground-motion values of a specific type.
Please read the manual for more information on the inputs.
"""
# Initialize variables.
all_im_values = []
all_im_types = []
all_poes = []
all_poos = []
damage_by_assets = []
# Read inputs.
taxonomy_to_fragility_source = csv.reader(open(taxonomy_conversion_filepath))
# Skipping the header.
next(taxonomy_to_fragility_source)
gm_field = np.loadtxt(gm_field_filepath, delimiter=",", skiprows=1)
exposure = pd.read_csv(exposure_filepath)
esrm20_fragility_model = FragilityModel.FragilityModel(
fragility_filepath, intensity_measure_map
)
esrm20_fragility_model.create_gmf_column_map(gm_field_filepath)
# Interpolate the ground-motion values for all the assets of the exposure model.
# pylint: disable=E1101
full_gm_field = losslib.get_full_gm_field(
gm_field, exposure.lon, exposure.lat, interpolation_method
)
# Creating an empty dictionary to later fill with ground-motion types and their column
# numbers (as they appear in the ground-motion-field file) as its key and value.
gm_type_index_map = {}
with open(gm_field_filepath) as gmf:
# Extracting the ground-motion types available in the ground-motion-field file. Note
# the ground-motion-field file format:
# `[lon, lat, gmValueofType1, ..., gmValueofTypeN]`
gm_types = gmf.readline().strip().split(",")[2:]
for i in range(len(gm_types)):
# Appending each `gm_type` and its column number (as it appears in the
# ground-motion-field file) to the `gm_type_index_map` as the dictionary
# key and value, respectively.
gm_type_index_map[gm_types[i]] = i + 2
# Calling the function "taxonomy_to_fragility" to get a dictionary with keys as the
# taxonomy and the values as both the fragility function name (excluding the ".csv" part)
# and the column number of the respective ground-motion type in `ground-motion-field` file.
taxonomy_to_fragility_map = losslib.taxonomy_to_fragility(
gm_type_index_map, taxonomy_to_fragility_source, fragility_pathname
)
gm_values = []
all_gm_types = []
all_PoEs = []
all_PoOs = []
damage_by_assets = []
# Going through each asset to do the damage calculation.
# pylint: disable=E1136
for asset in range(exposure.shape[0]):
taxonomy = exposure["taxonomy"][asset]
fragilityfunction_filename = taxonomy_to_fragility_map[taxonomy][0] + ".csv"
gm_type = taxonomy_to_fragility_map[taxonomy][2]
num_buildings = exposure["number"][asset]
# Read fragility functions as numpy arrays.
fragility_function = np.loadtxt(
fragility_pathname + "/" + fragilityfunction_filename,
delimiter=",",
usecols=range(1, 101),
)
# Computing the ground motions for each asset (also for duplicate locations).
gm_value = full_gm_field[asset, taxonomy_to_fragility_map[taxonomy][1]]
gm_values.append(gm_value)
all_gm_types.append(gm_type)
# Computing probabilities of exceedance and occurrence.
[PoEs, PoOs] = losslib.get_PoEs(fragility_function, gm_value)
all_PoEs.append(PoEs)
all_PoOs.append(PoOs)
# Compute damage by assets
damage_by_asset = [i * num_buildings for i in PoOs]
damage_by_assets.append(damage_by_asset)
# Append results
exposure["gm_value"] = gm_values
exposure["gm_type"] = all_gm_types
exposure["PoES"] = [all_PoEs for i in exposure.index][i]
exposure["PoOS"] = [all_PoOs for i in exposure.index][i]
exposure["structural_no_damage"] = [row[0] for row in damage_by_assets]
exposure["structural_slight"] = [row[1] for row in damage_by_assets]
exposure["structural_moderate"] = [row[2] for row in damage_by_assets]
exposure["structural_extensive"] = [row[3] for row in damage_by_assets]
exposure["structural_complete"] = [row[4] for row in damage_by_assets]
poes, poos, im_value, im_type = esrm20_fragility_model(taxonomy, full_gm_field[asset])
all_poes.append(poes)
all_poos.append(poos)
all_im_values.append(im_value)
all_im_types.append(im_type)
# Compute damage by assets.
damage_by_assets.append([i * num_buildings for i in poos])
# Append results.
# pylint: disable=E1137
exposure["gm_value"] = all_im_values
exposure["gm_type"] = all_im_types
exposure["PoEs"] = all_poes
exposure["PoOs"] = all_poos
exposure[esrm20_fragility_model.loss_category + "_no_damage"] = [
row[0] for row in damage_by_assets
]
for i in range(len(esrm20_fragility_model.limit_states)):
exposure[
esrm20_fragility_model.loss_category + "_" + esrm20_fragility_model.limit_states[i]
] = [row[(i + 1)] for row in damage_by_assets]
exposure.to_csv(result_filepath, index=False)
......@@ -131,7 +100,7 @@ def damage_calculator(
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="This program computes the probabilities of occurence of different "
description="This program computes the probabilities of occurrence of different "
+ "damage states for a scenario earthquake, given a ground-motion field "
+ "for the area, an exposure model representing the assets in the region "
+ "of interest and fragility functions that show the probability of "
......@@ -153,16 +122,16 @@ if __name__ == "__main__":
"--fragilities",
required=True,
type=str,
help="path to the directory that includes all the fragility csv file functions "
+ "(Required)",
help="file path to the fragility-function XML file (Required)",
)
parser.add_argument(
"-t",
"--taxonomy-map",
required=True,
"-m",
"--intensity-measure-map",
required=False,
type=str,
help="path to the file that includes the taxonomy names and their respective "
+ "fragility function names (Required)",
help="file path to the file that includes the mapping from the intensity-measure "
+ "names in the ground-motion field to the intensity-measure name used in the "
+ "fragility functions. If not provided, a default set will be used",
)
parser.add_argument(
"-g",
......@@ -198,10 +167,10 @@ if __name__ == "__main__":
)
args = parser.parse_args()
# read arguments from command line
# read arguments from command line.
interpolation_method = args.interpolation_method
fragility_pathname = args.fragilities
taxonomy_conversion_filepath = args.taxonomy_map
fragility_filepath = args.fragilities
intensity_measure_map = args.intensity_measure_map
gm_field_filepath = args.ground_motion_field
exposure_filepath = args.exposure
result_filepath = args.results
......@@ -221,8 +190,8 @@ if __name__ == "__main__":
damage_calculator(
exposure_filepath,
fragility_pathname,
taxonomy_conversion_filepath,
fragility_filepath,
intensity_measure_map,
gm_field_filepath,
interpolation_method,
result_filepath,
......
......@@ -19,9 +19,6 @@
import numpy as np
from scipy.interpolate import griddata
import csv
from scipy import interpolate
import re
def get_full_gm_field(gm_field, lons, lats, method="linear"):
......@@ -104,236 +101,3 @@ def get_full_gm_field(gm_field, lons, lats, method="linear"):
gm_value_griddata = griddata(points_given, gmvs_given, points_todo, method=method)
full_gm_field = np.column_stack((full_gm_field, gm_value_griddata))
return full_gm_field
def find_SA_frequency(SA_gm):
"""
Among the ground-motion types, SA-type naming includes an additional number
in parentheses indicating the frequency. This function extracts this
number as float from the given SA ground-motion as a string. The number is given as a list
of a single string, so it is extracted using the index ([o]) and then converted to float.
Input:
------
- SA_gm: (string)
Example extract:
>>> SA_gm
'SA(1)'
Output:
------
- SA_frequency: (float)
Example extract:
>>> SA_frequency
1.0
"""
SA_frequency = float(re.findall(r"\d+\.\d+|\d+", SA_gm)[0])
return SA_frequency
def taxonomy_to_fragility(gm_type_index_map, taxonomy_to_fragility_source, fragility_pathname):
"""
Creates an extended map of taxonomies to fragility function.
The input map 'taxonomy_to_fragility_source' contains the mapping for each
taxonomy to a fragility function file from which the ground-motion type is
read to be written to the extended map 'taxonomy_to_fragility_map'.
Input:
------
- gm_type_index_map: (dictionary)
key: ground-motion type; value: column number in the ground-motion
field file
Example extract:
>>> gm_type_index_map
{'gmv_PGA': 2, 'gmv_SA(0.3)': 3, 'gmv_SA(0.6)': 4, 'gmv_SA(1)': 5}
- taxonomy_to_fragility_source: (csv.reader)
taxonomy to fragility-function file map following the format:
[taxonomy_string, fragility-function_filename, 'weight'], [...]
Example file extract:
['CR/LDUAL+CDM/HBET:6-/11.0', 'CR_LDUAL-DUM_H6', '1']
['CR/LDUAL+CDM/HBET:6-/5.0', 'CR_LDUAL-DUL_H6', '1']
['CR/LDUAL+CDM/HBET:6-/SOS/11.0', 'CR_LDUAL-DUL_H6', '1']
...
- fragility_pathname: (string)
directory of fragility-function files
Example extract:
>>> fragility_pathname
'/home/laptop/fragilities'
Output:
------
- taxonomy_to_fragility_map: (Dictionary)
containing the taxonomy to fragility function map considering the
ground-motion types. It follows the format:
{taxonomy_string: [fragility-function_filename, column of
ground-motion_type in ground-motion_field file]}
Example extract:
{'CR/LDUAL+CDM/HBET:6-/11.0': ['CR_LDUAL-DUM_H6', 4],
'CR/LDUAL+CDM/HBET:6-/5.0': ['CR_LDUAL-DUL_H6', 4],
'CR/LDUAL+CDM/HBET:6-/SOS/11.0': ['CR_LDUAL-DUL_H6', 4], ... }
"""
# Prepare return variable
taxonomy_to_fragility_map = {}
# List of ground-motion types in the ground-motion field file
gm_types_from_gm_field = list(gm_type_index_map.keys())
# List of ground-motion types column numbers in the groun-motion field file
gm_type_column_number_from_gm_field = list(gm_type_index_map.values())
# Loop through the taxonomy-to-fragility-function map
for map_entry in taxonomy_to_fragility_source:
# Open the fragility-function file corresponding to the taxonomy in 'map_entry[1]'
fragility_function = list(
csv.reader(open(fragility_pathname + "/" + map_entry[1] + ".csv"))
)
# Check if already one fragility function for a given GM type has been selected
if map_entry[0] in taxonomy_to_fragility_map:
# Ignore the additional fragility function to keep everything unambiguous
continue
fragility_function_gmt = fragility_function[0][0]
# Detecting if the ground-motion type in the fragility functions is of type `SA`.
if re.search("SA", fragility_function_gmt):
# Extracting the SA frequency of the fragility function).
SA_type_in_fragility_function = find_SA_frequency(fragility_function_gmt)
# Looping through the gm_type_index_map dictionary keys which are
# the ground-motion types of the ground-motion field file
for gm_type in gm_type_index_map:
# Considering only ground-motion types of SA kind.
if gm_type.startswith("gmv_SA("):
SA_type_in_gm_type_index_map = find_SA_frequency(gm_type)
# Checking if the SA frequency is the same in both
# fragility function and the ground-motion field file
if SA_type_in_gm_type_index_map == SA_type_in_fragility_function:
# Taking the column number of the ground-motion type (of the fragility
# function) in the ground-motion field file.
gm_type_column_number = gm_type_index_map[gm_type]
fragility_function_gm_type = gm_types_from_gm_field[
gm_type_column_number_from_gm_field.index(gm_type_column_number)
]
break
else:
# The header of the ground-motion-field file contains a `gmv_` prefix which
# is kept to stay compatible with `OpenQuake`.
gm_type_column_number = gm_type_index_map["gmv_" + fragility_function_gmt]
fragility_function_gm_type = gm_types_from_gm_field[
gm_type_column_number_from_gm_field.index(gm_type_column_number)
]
taxonomy_to_fragility_map[map_entry[0]] = [
map_entry[1],
gm_type_column_number,
fragility_function_gm_type,
]
return taxonomy_to_fragility_map
def get_PoEs(fragility_function, gm_value):
"""
This function interpolates PoE values (for slight,moderate,extensive and
complete damages (Lines 2, 3, 4 and 5 of the fragility function
respectively)), considering the ground-motion value (in the first line of
the fragility function).
Input:
------
- gm_value : (int OR float)
value of the ground-motion.
Example extract:
>>> gm_value
2.152E-02
- fragility_function: (Numpy nd-array of shape (5,100) in our case;
An array with different levels of a ground_motion type as its first record
and probabilities of exceeding slight, moderate, extensive and complete damage states
corresponding to each level as the 2nd to 5th record.
Example extract:
>>> fragility_function
array([
[5.0e-02, 5.2-02, 5.4-02, 5.6e-02, ..., 3.414451e+00],
[0.0e+00, 0.0e+00, 0.0e+00, 0.0e+00, ..., 8.878820e-01],
[0.0e+00, 0.0e+00, 0.0e+00, 0.0e+00, ..., 4.696100e-01],
[0.0e+00, 0.0e+00, 0.0e+00, 0.0e+00, ..., 2.454840e-01],
[0.0e+00, 0.0e+00, 0.0e+00, 0.0e+00, ..., 1.368970e-01]
])
Output: