Commit 4da4c24c authored by Danijel Schorlemmer's avatar Danijel Schorlemmer
Browse files

Implementation of the CSV import

parent ca4585ce
Pipeline #35371 passed with stage
in 1 minute and 50 seconds
......@@ -33,12 +33,13 @@ VERSION_STRING = "losscalculator version 0.1"
def damage_calculator(
exposure_filepath,
fragility_filepath,
fragility_source,
intensity_measure_map_filepath,
taxonomy_map_filepath,
gm_field_filepath,
interpolation_method="linear",
result_filepath="damage_result.csv",
use_xml_fragilities=True,
):
"""
This function computes the probabilities of occurrence (PoO) of damage states for a scenario
......@@ -59,10 +60,11 @@ def damage_calculator(
gm_field = np.loadtxt(gm_field_filepath, delimiter=",", skiprows=1)
exposure = pd.read_csv(exposure_filepath)
fragility_model = FragilityModel(
fragility_filepath,
fragility_source,
gm_field_filepath,
intensity_measure_map_filepath,
taxonomy_map_filepath,
use_xml_fragilities,
)
# Interpolate the ground-motion values for all the assets of the exposure model.
......@@ -129,14 +131,21 @@ def main():
choices=["linear", "cubic", "nearest"],
help="method used to interpolate the input ground-motion values over all "
+ "the exposure locations (asset locations). "
+ "Options: [linear, nearest, cubic] (default set to linear)",
+ "Options: ['linear', 'nearest', 'cubic'] (default set to linear)",
)
parser.add_argument(
"-f",
"--fragilities",
"--fragility-file",
required=False,
type=str,
help="file path to the fragility-function XML file",
help="file path to a fragility-function XML file",
)
parser.add_argument(
"-c",
"--csv-directory",
required=False,
type=str,
help="path name of the directory containing fragility functions as CSV files (*.csv)",
)
parser.add_argument(
"-m",
......@@ -193,7 +202,8 @@ def main():
# read arguments from command line.
command = args.command
interpolation_method = args.interpolation_method
fragility_filepath = args.fragilities
fragility_source = args.fragility_file
csv_directory = args.csv_directory
intensity_measure_map = args.intensity_measure_map
gm_field_filepath = args.ground_motion_field
exposure_filepath = args.exposure
......@@ -217,12 +227,22 @@ def main():
+ "-e/--exposure option to provide an exposure file."
)
exit()
if fragility_filepath is None:
print(
"Error: No fragility functions provided. Please use the "
+ "-f/--fragilities option to provide a fragility-function file."
)
exit()
use_xml_fragilities = True
if fragility_source is not None:
if csv_directory is not None:
print("-f option given, ignoring -c option.")
else:
if csv_directory is not None:
fragility_source = csv_directory
use_xml_fragilities = False
else:
print(
"Error: No fragility functions provided. Please use the "
+ "-f/--fragility-file option to provide a fragility-function file or "
+ "-c/--csv-directory option to provide a directory containing "
+ "fragility functions as CSV files."
)
exit()
if os.path.exists(result_filepath):
if not overwrite_result_file:
print(
......@@ -238,12 +258,13 @@ def main():
damage_calculator(
exposure_filepath,
fragility_filepath,
fragility_source,
intensity_measure_map,
taxonomy_map_filepath,
gm_field_filepath,
interpolation_method,
result_filepath,
use_xml_fragilities,
)
print("Execution time of the script", (datetime.datetime.now() - startTime))
else:
......
import csv
import numpy as np
import xml.etree.ElementTree as ET
import xml.etree.ElementTree as ETree
import glob
from scipy.stats import lognorm
from pathlib import Path
class FragilityModel:
......@@ -12,20 +14,22 @@ class FragilityModel:
def __init__(
self,
fragility_functions_filepath,
fragility_source,
gmf_filepath,
intensity_measure_map_filepath=None,
taxonomy_map_filepath=None,
use_xml_fragilities=True,
):
"""
Initializing the fragility model by importing the XML file with fragility functions
Initializing the fragility model by importing either an XML file with fragility
functions or a set of fragility functions stored in CSV files from a directory
and creating the intensity-measure map that maps intensity-measure types as given in
the fragility functions to the ones in the ground-motion field file. It then creates
a mapping dictionary between the intensity measure types and their respective
column number in the ground-motion field file for faster access to data.
This function is limited to one fragility function per building taxonomy.
Dictionary structure for discrete functions:
Dictionary structure for discrete functions (XML or CSV):
'CR/LFINF+CDL/HBET:7-20/0.0':
{'imv': array([2.225000e-03, 2.397000e-03, 2.581000e-03]),
'slight': array([0.00000e+00, 0.00000e+00, 0.00000e+00]),
......@@ -52,9 +56,10 @@ class FragilityModel:
{'PGA': 2, 'SA(0.3)': 3, 'SA(0.6)': 4, 'SA(1)': 5}
Args:
fragility_functions_filepath (str):
File path to the XML file containing fragility functions in NRML as
provided by the Global Earthquake Model.
fragility_source (str):
File path of an XML file containing fragility functions in NRML as
provided by the Global Earthquake Model or a file search pattern
for CSV files containing fragility functions (e.g.: 'csvfiles/*.csv')
gmf_filepath (str):
File path to the ground-motion field.
......@@ -92,48 +97,103 @@ class FragilityModel:
CR/LFM+CDN/H:1,CR_LFM-CDN_H1_0
...
"""
use_xml_fragilities (boolean)
If set (default), an XML file is expected in fragility_source, otherwise
the file search pattern for CSV files.
tree = ET.parse(fragility_functions_filepath)
root = tree.getroot()[0]
assert "fragilityModel" in root.tag, (
"%s does not contain a recognized fragility model" % fragility_functions_filepath
)
"""
self.asset_category = root.attrib["assetCategory"]
self.loss_category = root.attrib["lossCategory"]
self.fragility_functions = {}
self.gm_type_column_map = {}
self.taxonomy_map = {}
self.limit_states = None
self.num_limit_states = 0
for child in root:
if "description" in child.tag:
self.description = child.text
elif "limitStates" in child.tag:
self.limit_states = child.text.split()
self.num_limit_states = len(self.limit_states)
elif "fragilityFunction" in child.tag:
limit_states = []
# Intensity-measure type
im_type = None
parameters = {}
# Import a discrete function
if child.get("format") == "discrete":
# Intensity-measure values and probabilities of exceedance
for ichild in list(child):
key, value = ichild.items()[0]
if key == "imt":
im_type = value
im_value = np.array(list(map(float, ichild.text.split())))
parameters["imv"] = im_value
if key == "ls":
ls = value
values = np.array(list(map(float, ichild.text.split())))
parameters[ls] = values
limit_states.append(value)
parameters["format"] = child.get("format")
parameters["imt"] = im_type
# Use XML file
if use_xml_fragilities:
tree = ETree.parse(fragility_source)
root = tree.getroot()[0]
assert "fragilityModel" in root.tag, (
"%s does not contain a recognized fragility model" % fragility_source
)
self.asset_category = root.attrib["assetCategory"]
self.loss_category = root.attrib["lossCategory"]
for child in root:
if "description" in child.tag:
self.description = child.text
elif "limitStates" in child.tag:
self.limit_states = child.text.split()
self.num_limit_states = len(self.limit_states)
elif "fragilityFunction" in child.tag:
limit_states = []
# Intensity-measure type
im_type = None
parameters = {}
# Import a discrete function
if child.get("format") == "discrete":
# Intensity-measure values and probabilities of exceedance
for ichild in list(child):
key, value = ichild.items()[0]
if key == "imt":
im_type = value
im_value = np.array(list(map(float, ichild.text.split())))
parameters["imv"] = im_value
if key == "ls":
ls = value
values = np.array(list(map(float, ichild.text.split())))
parameters[ls] = values
limit_states.append(value)
parameters["format"] = child.get("format")
parameters["imt"] = im_type
if im_type is None:
raise ValueError(
"No intensity measure given for fragility function %s"
% (child.get("id"))
)
if not (limit_states == self.limit_states):
raise ValueError(
"Limit states for fragility function"
+ "%s differ from global limit states %s"
% (child.get("id"), self.limit_states)
)
# Import a continuous function
elif child.get("format") == "continuous":
parameters["format"] = child.get("format")
if child.get("shape") == "logncdf":
for ichild in list(child):
if "imls" in ichild.tag:
for key, value in ichild.items():
parameters[key] = value
if key == "imt":
im_type = value
elif "params" in ichild.tag:
ls_params = {}
limit_state = None
for key, value in ichild.items():
if key == "ls":
limit_state = value
limit_states.append(value)
else:
ls_params[key] = float(value)
variance = ls_params["stddev"] ** 2.0
sigma = np.sqrt(
np.log((variance / (ls_params["mean"] ** 2.0)) + 1.0)
)
mu = (ls_params["mean"] ** 2.0) / np.sqrt(
variance + ls_params["mean"] ** 2.0
)
ls_params["function"] = lognorm(sigma, scale=mu)
parameters[limit_state] = ls_params
else:
raise ValueError(
"Unknown function form %s of fragility function %s"
% (child.get("shape"), child.get("id"))
)
# Final checks to make sure the fragility function is valid
if im_type is None:
raise ValueError(
"No intensity measure given for fragility function %s"
......@@ -145,55 +205,42 @@ class FragilityModel:
+ "%s differ from global limit states %s"
% (child.get("id"), self.limit_states)
)
self.fragility_functions[child.get("id")] = parameters
# Import a continuous function
elif child.get("format") == "continuous":
parameters["format"] = child.get("format")
if child.get("shape") == "logncdf":
for ichild in list(child):
if "imls" in ichild.tag:
for key, value in ichild.items():
parameters[key] = value
if key == "imt":
im_type = value
elif "params" in ichild.tag:
ls_params = {}
limit_state = None
for key, value in ichild.items():
if key == "ls":
limit_state = value
limit_states.append(value)
else:
ls_params[key] = float(value)
variance = ls_params["stddev"] ** 2.0
sigma = np.sqrt(
np.log((variance / (ls_params["mean"] ** 2.0)) + 1.0)
)
mu = (ls_params["mean"] ** 2.0) / np.sqrt(
variance + ls_params["mean"] ** 2.0
)
ls_params["function"] = lognorm(sigma, scale=mu)
parameters[limit_state] = ls_params
else:
raise ValueError(
"Unknown function form %s of fragility function %s"
% (child.get("shape"), child.get("id"))
)
# Use CSV files
else:
self.asset_category = "buildings"
self.loss_category = "structural"
self.description = ""
for ff_filepath in glob.glob(fragility_source):
limit_states = []
parameters = {"format": "discrete"}
taxonomy = Path(ff_filepath).stem
with open(ff_filepath) as f:
line = f.readline()
line_list = line.split(",")
parameters["imt"] = line_list[0]
values = np.array(list(map(float, line_list[1:])))
parameters["imv"] = values
for line in f:
line_list = line.split(",")
limit_states.append(line_list[0])
values = np.array(list(map(float, line_list[1:])))
parameters[line_list[0]] = values
# Final checks to make sure the fragility function is valid
if im_type is None:
raise ValueError(
"No intensity measure given for fragility function %s"
% (child.get("id"))
)
if not (limit_states == self.limit_states):
raise ValueError(
"Limit states for fragility function"
+ "%s differ from global limit states %s"
% (child.get("id"), self.limit_states)
)
self.fragility_functions[child.get("id")] = parameters
if self.limit_states is None:
self.limit_states = limit_states
self.num_limit_states = len(self.limit_states)
else:
if not (limit_states == self.limit_states):
raise ValueError(
"Limit states for fragility function"
+ "%s differ from global limit states %s" % taxonomy,
self.limit_states,
)
self.fragility_functions[taxonomy] = parameters
# Read the intensity measure map into a dictionary
if intensity_measure_map_filepath is not None:
......@@ -289,8 +336,7 @@ class FragilityModel:
column = self.gm_type_column_map.get(im_type, None)
if column is None:
raise ValueError(
"Necessary intensity measure type %s not given in ground-motion file."
% (im_type)
"Necessary intensity measure type %s not given in ground-motion file." % im_type
)
im_value = intensity_measure_values[column]
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment