Commit f4fb0529 authored by g-weatherill's avatar g-weatherill
Browse files

Adds ShakemapWorkflowResults object for storing workflow results

parent 25e60331
Pipeline #23226 passed with stage
in 9 minutes and 51 seconds
......@@ -220,7 +220,6 @@ def fetch_quakeml(path: str) -> Union[dict, None]:
print("Oops, no preferredOriginID was found")
return None
origin = fetch_origin(root, ns, preferredoriginID)
print(origin)
(d, t) = origin.pop("time").split("T", 2)
# There's little point in forcing these into datetimes,
# since writing to YAML requires they be converted back
......
......@@ -32,6 +32,26 @@ DEFAULT_CONFIGURATION = {
}
class ShakemapWorkflowResult(object):
"""
Object to hold output information from shakemap calculations, including
configuration metadata
"""
__slots__ = [
"shakemaps",
"contours",
"num_sites",
"bbox_properties",
"ground_motion_models",
"tectonic_region",
"earthquake",
]
def __getitem__(self, key):
return getattr(self, key)
def shakemaps_from_quakeml(
event_id: str,
imts: List = ["PGA", "SA(0.3)", "SA(1.0)"],
......@@ -67,8 +87,10 @@ def shakemaps_from_quakeml(
if os.path.exists(export_folder):
raise IOError("Designated export folder %s already exists!" % export_folder)
os.mkdir(export_folder)
# Create the event from the GEOFON event ID (or the path to the QuakeML file)
earthquake = Earthquake.from_quakeml(event_id)
results = ShakemapWorkflowResult()
results.earthquake = Earthquake.from_quakeml(event_id)
# Split the configuration into those parts relating to the bounding box,
# the site model and the shakemap
bbox_config = {}
......@@ -87,13 +109,15 @@ def shakemaps_from_quakeml(
]:
site_config[key] = config.get(key, DEFAULT_CONFIGURATION[key])
# Build the site model
bbox = earthquake.get_maximum_distance_bbox(bbox_config["max_distance_bbox"])
bbox = results.earthquake.get_maximum_distance_bbox(bbox_config["max_distance_bbox"])
vs30 = site_config.pop("default_vs30")
site_model = SiteModel.from_bbox(
bbox, bbox_config["spcx"], bbox_config["spcy"], vs30, **site_config
)
results.num_sites = len(site_model)
results.bbox_properties = site_model.bbox_properties
# Get the ground motion models
tect_region, gsims = regionalization(earthquake)
results.tectonic_region, results.ground_motion_models = regionalization(results.earthquake)
shakemap_config = {}
for key in [
"num_rupture_samples",
......@@ -105,40 +129,45 @@ def shakemaps_from_quakeml(
shakemap_config[key] = config.get(key, DEFAULT_CONFIGURATION[key])
# Run the shakemap
shakemap = Shakemap(
earthquake, site_model, gsims, tect_region, cache_file=cache_file, **shakemap_config
results.earthquake,
site_model,
results.ground_motion_models,
results.tectonic_region,
cache_file=cache_file,
**shakemap_config
)
mean_shakemaps, stddev_shakemaps, _ = shakemap.get_shakemap(imts)
# Export to file (if an export directory is given) or to a dictionary of byte arrays
output_shakemaps = {"mean": {}, "stddevs": {}}
output_contours = {"mean": {}, "stddevs": {}}
results.shakemaps = {"mean": {}, "stddevs": {}}
results.contours = {"mean": {}, "stddevs": {}}
for imt in imts:
output_shakemaps["mean"][imt] = shakemap.to_geotiff(mean_shakemaps, imt)
output_shakemaps["stddevs"][imt] = shakemap.to_geotiff(
results.shakemaps["mean"][imt] = shakemap.to_geotiff(mean_shakemaps, imt)
results.shakemaps["stddevs"][imt] = shakemap.to_geotiff(
stddev_shakemaps, imt, is_stddev=True
)
output_contours["mean"][imt] = shakemap.get_contours(
results.contours["mean"][imt] = shakemap.get_contours(
imt, mean_shakemaps, contour_levels_mean
)
output_contours["stddevs"][imt] = shakemap.get_contours(
results.contours["stddevs"][imt] = shakemap.get_contours(
imt, stddev_shakemaps, contour_levels_stddev, is_stddev=True
)
if export_folder:
filestem = "{:s}_{:s}".format(earthquake.id, imt)
filestem = "{:s}_{:s}".format(results.earthquake.id, imt)
# Export the bytes to raster files
fname_raster_mean = os.path.join(export_folder, filestem + "_mean.tif")
with open(fname_raster_mean, "wb") as f:
f.write(output_shakemaps["mean"][imt])
f.write(results.shakemaps["mean"][imt])
fname_raster_stddev = os.path.join(export_folder, filestem + "_stddev.tif")
with open(fname_raster_stddev, "wb") as f:
f.write(output_shakemaps["stddevs"][imt])
f.write(results.shakemaps["stddevs"][imt])
# Export the contour dataframes to geojson
fname_contour_mean = os.path.join(export_folder, filestem + "_contour_mean.geojson")
output_contours["mean"][imt].to_file(fname_contour_mean, driver="GeoJSON")
if output_contours["stddevs"][imt].shape[0]:
results.contours["mean"][imt].to_file(fname_contour_mean, driver="GeoJSON")
if results.contours["stddevs"][imt].shape[0]:
# If all the sites have the same standard deviation then skip this as the
# contours will yield an empty dataframe
fname_contour_stddev = os.path.join(
export_folder, filestem + "_contour_stddev.geojson"
)
output_contours["stddevs"][imt].to_file(fname_contour_stddev, driver="GeoJSON")
return output_shakemaps, output_contours
results.contours["stddevs"][imt].to_file(fname_contour_stddev, driver="GeoJSON")
return results
......@@ -4,7 +4,7 @@ Test cases for end-to-end workflows
import os
import unittest
from geopandas import GeoDataFrame
from shakyground2.workflows import shakemaps_from_quakeml
from shakyground2.workflows import shakemaps_from_quakeml, ShakemapWorkflowResult
DATA_PATH = os.path.join(os.path.dirname(__file__), "data")
......@@ -21,33 +21,34 @@ class GeofonWorkflowTestCase(unittest.TestCase):
def test_complete_workflow_no_export_default_config(self):
event_id = os.path.join(DATA_PATH, "gfz2021eksc.xml")
output_shakemaps, output_contours = shakemaps_from_quakeml(event_id, self.imts)
self.assertListEqual(list(output_shakemaps), ["mean", "stddevs"])
self.assertListEqual(list(output_shakemaps["mean"]), self.imts)
results = shakemaps_from_quakeml(event_id, self.imts)
self.assertIsInstance(results, ShakemapWorkflowResult)
self.assertListEqual(list(results.shakemaps), ["mean", "stddevs"])
self.assertListEqual(list(results.shakemaps["mean"]), self.imts)
for imt in self.imts:
for maptype in output_shakemaps:
self.assertIsInstance(output_shakemaps[maptype][imt], bytes)
self.assertIsInstance(output_contours["mean"][imt], GeoDataFrame)
for maptype in results.shakemaps:
self.assertIsInstance(results.shakemaps[maptype][imt], bytes)
self.assertIsInstance(results.contours["mean"][imt], GeoDataFrame)
def test_complete_workflow_no_export(self):
event_id = os.path.join(DATA_PATH, "gfz2021eksc.xml")
output_shakemaps, output_contours = shakemaps_from_quakeml(
event_id, self.imts, config={"spcx": 0.1, "spcy": 0.1}
)
self.assertListEqual(list(output_shakemaps), ["mean", "stddevs"])
self.assertListEqual(list(output_shakemaps["mean"]), self.imts)
results = shakemaps_from_quakeml(event_id, self.imts, config={"spcx": 0.1, "spcy": 0.1})
self.assertListEqual(list(results.shakemaps), ["mean", "stddevs"])
self.assertListEqual(list(results.shakemaps["mean"]), self.imts)
self.assertIsInstance(results, ShakemapWorkflowResult)
for imt in self.imts:
for maptype in output_shakemaps:
self.assertIsInstance(output_shakemaps[maptype][imt], bytes)
self.assertIsInstance(output_contours["mean"][imt], GeoDataFrame)
for maptype in results.shakemaps:
self.assertIsInstance(results.shakemaps[maptype][imt], bytes)
self.assertIsInstance(results.contours["mean"][imt], GeoDataFrame)
def test_workflow_with_exports(self):
test_folder = os.path.join(DATA_PATH, "tmp_shakemaps_from_quakeml")
event_id = os.path.join(DATA_PATH, "gfz2021eksc.xml")
output_shakemaps, output_contours = shakemaps_from_quakeml(
results = shakemaps_from_quakeml(
event_id, self.imts, config={"spcx": 0.1, "spcy": 0.1}, export_folder=test_folder
)
self.assertIsInstance(results, ShakemapWorkflowResult)
# Verify that the files have been created
self.assertTrue(os.path.exists(os.path.join(test_folder, "gfz2021eksc_PGA_mean.tif")))
self.assertTrue(os.path.exists(os.path.join(test_folder, "gfz2021eksc_PGA_stddev.tif")))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment