Commit afcb1318 authored by g-weatherill's avatar g-weatherill
Browse files

Merge remote-tracking branch 'upstream/master'

parents 415ff4eb eb8b533e
Pipeline #22136 passed with stage
in 8 minutes and 39 seconds
......@@ -14,6 +14,7 @@ setup(
"openquake.engine",
"geopandas",
"Rtree",
"rasterio",
],
packages=find_packages(),
python_requires=">=3.7",
......
"""
Implements the core shakemap class
"""
import io
import h5py
import numpy as np
import warnings
from typing import Dict, Optional, Tuple, List
import rasterio
import geopandas as gpd
import matplotlib.pyplot as plt
from typing import Dict, Optional, Tuple, List, Union
from shapely.geometry import LineString, MultiLineString
from openquake.hazardlib import const, imt
from openquake.hazardlib.contexts import ContextMaker
from shakyground2 import valid
......@@ -286,3 +291,229 @@ class Shakemap(object):
"stddev", aggregated_stddevs.shape, shakemap_dtypes
)
aggregated_stddev_dset[:] = aggregated_stddevs
def _transform_to_raster(
self, imt: str, shakemap: np.ndarray, is_stddev: bool = False
) -> np.ndarray:
"""
Transforms a shakemap output into a 2D array for raster export based
on the bounding box properties in the site model
Args:
imt: Intensity measure type
shakemap: shakemap outputs from `get_shakemap(...)`
is_stddev: shakemap values are standard deviations (True) or expected ground motion
Returns:
shakemap as 2D numpy array
"""
if not self.site_model.bbox_properties:
raise ValueError(
"No bounding bbox properties found in SiteModel - "
"cannot export to raster format"
)
if is_stddev:
# If the input is a standard deviation then no transformation of the data
results = np.copy(shakemap[imt])
else:
# For absolute ground motion take the natural exponent
results = np.exp(shakemap[imt])
assert (
results.shape[0] == self.site_model.shape
), "Shakemap dimensions do not correspond to site model"
# Reshape and export to raster
shakemap_shape = [
self.site_model.bbox_properties["nrow"],
self.site_model.bbox_properties["ncol"],
]
return results.reshape(shakemap_shape)
def to_geotiff(
self,
shakemap: np.ndarray,
imt: str,
filename: Optional[str] = None,
is_stddev: bool = False,
) -> Union[io.BytesIO, None]:
"""
Exports the shakemap for a given intensity measure type to GeoTiff format
Args:
shakemap: shakemap outputs from `get_shakemap(...)`
imt: Intensity measure type
filename: Path to geotiff file for export. If None then returns the geotiff as a
bytes object
is_stddev: shakemap values are standard deviations (True) or expected ground motion
"""
results = self._transform_to_raster(imt, shakemap, is_stddev)
spcx = self.site_model.bbox_properties["spcx"]
spcy = self.site_model.bbox_properties["spcy"]
# Build the transformation object (holds the geospatial information about the raster)
transform = (
rasterio.transform.Affine.translation(
self.site_model.bbox_properties["bbox"][0] - (spcx / 2.0),
self.site_model.bbox_properties["bbox"][1] - (spcy / 2.0),
)
* rasterio.transform.Affine.scale(spcx, spcy)
)
# Export to file
if filename:
with rasterio.open(
filename,
"w",
"GTiff",
height=results.shape[0],
width=results.shape[1],
count=1,
dtype=results.dtype,
crs="+proj=latlong",
transform=transform,
compress="lzw",
) as dst:
dst.write(results, 1)
return
# Returns a bytes object
with rasterio.MemoryFile(ext=".tif") as memfile:
with memfile.open(
driver="GTiff",
height=results.shape[0],
width=results.shape[1],
count=1,
dtype=results.dtype,
crs="+proj=latlong",
transform=transform,
compress="lzw",
) as dst:
dst.write(results, 1)
buffer = memfile.getbuffer()
io_buffer = io.BytesIO(buffer)
as_bytes = io_buffer.read()
return as_bytes
def to_esri_ascii(
self,
shakemap: np.ndarray,
imt: str,
filename: str,
is_stddev: bool = False,
nodata: Optional[float] = None,
fmt: str = "%.4e",
):
"""
Exports the shakemap for a given intensity measure type to ESRI ascii format
Args:
shakemap: shakemap outputs from `get_shakemap(...)`
imt: Intensity measure type
filename: Path to ESRI Ascii file for export
is_stddev: shakemap values are standard deviations (True) or expected ground motion
"""
if not np.isclose(
self.site_model.bbox_properties["spcx"], self.site_model.bbox_properties["spcy"]
):
# ESRI Grid files cannot accommodate this
raise IOError(
"ESRI ascii files cannot accommodate unequal spacing "
"in the X and Y dimension - try GeoTIFF instead"
)
results = self._transform_to_raster(imt, shakemap, is_stddev)
# Export the raster to file
with open(filename, "w") as f:
# Write header information
f.write("NCOLS %g\n" % self.site_model.bbox_properties["ncol"])
f.write("NROWS %g\n" % self.site_model.bbox_properties["nrow"])
f.write("XLLCENTER %.8f\n" % self.site_model.bbox_properties["bbox"][0])
f.write("YLLCENTER %.8f\n" % self.site_model.bbox_properties["bbox"][1])
f.write("CELLSIZE %.8e\n" % self.site_model.bbox_properties["spcx"])
if nodata is not None:
f.write("NODATA_VALUE %.8f\n" % nodata)
# Write the data
np.savetxt(f, results, fmt=fmt)
return
def get_contours(
self,
imt: str,
shakemap: np.ndarray,
levels: Union[int, np.ndarray] = 10,
is_stddev: bool = False,
) -> gpd.GeoDataFrame:
"""
For a shakemap of a given intensity measure type, retrieves the
set of contours and exports to a geopandas GeoDataframe
Args:
imt: Intensity measure type
shakemap: shakemap outputs from `get_shakemap(...)`
levels: Number of levels for the contours or pre-defined set of levels (see
documentation for matplotlib.pyplot.contour)
is_stddev: shakemap values are standard deviations (True) or expected ground motion
Returns:
Contour set as geopandas GeoDataframe
"""
# For contouring we need the shakemap in 2D
results = self._transform_to_raster(imt, shakemap, is_stddev)
new_shape = [
self.site_model.bbox_properties["nrow"],
self.site_model.bbox_properties["ncol"],
]
lons = self.site_model["lon"].reshape(new_shape)
lats = self.site_model["lat"].reshape(new_shape)
# Surpress plotting
plt.ioff()
fig = plt.figure()
ax = fig.add_subplot(111)
# Get the contours
contours = ax.contour(lons, lats, results, levels)
contour_levels = []
contour_geometries = []
# Retrieve the x, y coordinates of each contour
for level, segs in zip(contours.levels, contours.allsegs):
if not len(segs):
continue
else:
geometries = []
for seg in segs:
geometries.append(LineString([(row[0], row[1]) for row in seg]))
if len(geometries) > 1:
# Has multiple contour lines for this level
contour_geometries.append(MultiLineString(geometries))
else:
# Only a single contour for this level
contour_geometries.append(geometries[0])
contour_levels.append(level)
plt.close(fig)
# Re-allow plotting
plt.ion()
# Create geodataframe
dframe = gpd.GeoDataFrame(
{imt: contour_levels, "geometry": gpd.GeoSeries(contour_geometries)}
)
dframe.crs = {"init": "epsg:4326"}
return dframe
def contours_to_file(
self,
shakemap: np.ndarray,
imt: str,
filename: str,
levels: Union[int, np.ndarray] = 10,
is_stddev: bool = False,
driver: str = "GeoJSON",
):
"""
Exports the contours of a shapefile to a vector spatial format
Args:
imt: Intensity measure type
shakemap: shakemap outputs from `get_shakemap(...)`
levels: Number of levels for the contours or pre-defined set of levels (see
documentation for matplotlib.pyplot.contour)
is_stddev: shakemap values are standard deviations (True) or expected ground motion
driver: Any Vector spatial file driver supported by Fiona
https://github.com/Toblerity/Fiona/blob/master/fiona/drvsupport.py
"""
contours = self.get_contours(imt, shakemap, levels, is_stddev)
contours.to_file(filename, driver=driver)
return
This diff is collapsed.
This diff is collapsed.
......@@ -4,7 +4,9 @@ Set of test cases for the shakemap class
import os
import unittest
import h5py
import rasterio
import numpy as np
from geopandas import GeoDataFrame
from shakyground2.earthquake import Earthquake
from shakyground2.site_model import SiteModel
from shakyground2.shakemap import Shakemap
......@@ -105,3 +107,191 @@ class ShakemapTestCase(unittest.TestCase):
def tearDown(self):
# Delete the cache file
os.remove(self.cache_test_file)
class ShakemapExportersTestCase(unittest.TestCase):
"""
General test case for the shakemap exporters - synthetic 2D shakemap based on the function
matplotlib contour demo (https://tinyurl.com/22sdmajk)
"""
def setUp(self):
self.site_model = SiteModel.from_bbox(
[-3.0, -2.0, 3.0, 2.0], 0.025, 0.025, vs30=800.0, z1pt0=10.0
)
self.shakemap = Shakemap(
Earthquake("XYZ", 0.0, 0.0, 10.0, 6.0),
self.site_model,
[("B14", GSIM_LIST["BindiEtAl2014Rjb"](), 1.0)],
"ASC",
)
# 2D smooth function for contouring
z1 = np.exp(-self.site_model["lon"] ** 2 - self.site_model["lat"] ** 2)
z2 = np.exp(-((self.site_model["lon"] - 1) ** 2) - (self.site_model["lat"] - 1) ** 2)
self.dummy = (z1 - z2) * 2
def test_transform_to_raster(self):
# Test transformation to raster
# Map the 2D function (double peaked) into a shakemap output format
results = np.zeros(self.site_model.shape, dtype=np.dtype([("PGA", np.float64)]))
results["PGA"] = np.log(np.fabs(self.dummy) + 0.001)
results_as_raster = self.shakemap._transform_to_raster("PGA", results)
# Verify that this gives the expected results
delta = 0.025
x = np.arange(-3.0, 3.0 + delta, delta)
y = np.arange(-2.0, 2.0 + delta, delta)
X, Y = np.meshgrid(x, y)
Z1 = np.exp(-(X ** 2.0) - Y ** 2)
Z2 = np.exp(-((X - 1) ** 2) - (Y - 1) ** 2)
Z = np.fabs((Z1 - Z2) * 2) + 0.001
np.testing.assert_array_almost_equal(results_as_raster, Z)
def test_incorrect_raster_formulations(self):
# Tests the cases where the raster cannot be built
# Case 1 - No bbox properties in Site Model
site_model = SiteModel.from_bbox(
[-3.0, -2.0, 3.0, 2.0], 0.025, 0.025, vs30=800.0, z1pt0=10.0
)
# Re-set bounding box properties
site_model.bbox_properties = {}
shakemap = Shakemap(
Earthquake("XYZ", 0.0, 0.0, 10.0, 6.0),
site_model,
[("B14", GSIM_LIST["BindiEtAl2014Rjb"](), 1.0)],
"ASC",
)
results = np.zeros(site_model.shape, dtype=np.dtype([("PGA", np.float64)]))
results["PGA"] = np.log(np.fabs(self.dummy) + 0.001)
with self.assertRaises(ValueError) as ve:
shakemap._transform_to_raster("PGA", results)
self.assertEqual(
str(ve.exception),
"No bounding bbox properties found in SiteModel - cannot export to raster format",
)
# Case 2 - Site Model has difference size to shakemap
site_model2 = SiteModel.from_bbox(
[-3.0, -2.0, 3.0, 2.0], 0.5, 0.5, vs30=800.0, z1pt0=10.0
)
# Re-set bounding box properties
shakemap2 = Shakemap(
Earthquake("XYZ", 0.0, 0.0, 10.0, 6.0),
site_model2,
[("B14", GSIM_LIST["BindiEtAl2014Rjb"](), 1.0)],
"ASC",
)
with self.assertRaises(AssertionError) as ae:
shakemap2._transform_to_raster("PGA", results)
self.assertEqual(
str(ae.exception), "Shakemap dimensions do not correspond to site model"
)
def test_export_to_raster_geotiff_file(self):
# Test raster export to geotiff as a round trip
results = np.zeros(self.site_model.shape, dtype=np.dtype([("PGA", np.float64)]))
results["PGA"] = np.copy(self.dummy)
results_as_raster = self.shakemap._transform_to_raster("PGA", results)
geotiff_fname = os.path.join(DATA_PATH, "dummy_raster.tif")
self.shakemap.to_geotiff(results, "PGA", geotiff_fname)
# Load back in the geotiff and verify that the data content is the same
data = rasterio.open(geotiff_fname)
self.assertEqual(data.height, results_as_raster.shape[0])
self.assertEqual(data.width, results_as_raster.shape[1])
np.testing.assert_array_almost_equal(data.read(1), results_as_raster)
os.remove(geotiff_fname)
def test_export_to_raster_geotiff_bytes(self):
# Test raster export to geotiff as an in memory file - compares this against the
# case that the raster is exported directly to file and verifies that the relevant
# content is the same
results = np.zeros(self.site_model.shape, dtype=np.dtype([("PGA", np.float64)]))
results["PGA"] = np.copy(self.dummy)
data_bytes = self.shakemap.to_geotiff(results, "PGA")
byte_shakemap_file = os.path.join(DATA_PATH, "dummy_shakemap_from_bytes.tif")
with open(byte_shakemap_file, "wb") as f:
f.write(data_bytes)
# Verify that this produces the same shakemap as direct export
shakemap_file = os.path.join(DATA_PATH, "dummy_shakemap.tif")
self.shakemap.to_geotiff(results, "PGA", shakemap_file)
# Compare the two files
raster_1 = rasterio.open(byte_shakemap_file)
raster_2 = rasterio.open(shakemap_file)
# Compare bounding boxes
for pos in ["left", "right", "top", "bottom"]:
self.assertAlmostEqual(getattr(raster_1.bounds, pos), getattr(raster_2.bounds, pos))
# Compare dimensions
self.assertEqual(raster_1.width, raster_2.width)
self.assertEqual(raster_1.height, raster_2.height)
for i in range(2):
self.assertAlmostEqual(raster_1.res[i], raster_2.res[i])
# Check data are equal
np.testing.assert_array_almost_equal(raster_1.read(1), raster_2.read(1))
# Remove testing diles
os.remove(byte_shakemap_file)
os.remove(shakemap_file)
def test_export_to_esri_ascii(self):
# Test raster export to ESRI ascii as a round trip
results = np.zeros(self.site_model.shape, dtype=np.dtype([("PGA", np.float64)]))
results["PGA"] = np.copy(self.dummy)
results_as_raster = self.shakemap._transform_to_raster("PGA", results, is_stddev=True)
ascii_fname = os.path.join(DATA_PATH, "dummy_raster.asc")
self.shakemap.to_esri_ascii(results, "PGA", ascii_fname, is_stddev=True)
# Reload in the ascii file and verify that the data content is the same
data = np.genfromtxt(ascii_fname, delimiter=" ", skip_header=5)
np.testing.assert_array_almost_equal(data, results_as_raster, 4)
os.remove(ascii_fname)
def _compare_contour_geodataframes(
self, dframe1: GeoDataFrame, dframe2: GeoDataFrame, imt: str, tol: float = 1.0e-7
):
"""
Checks that both the geodataframes are equal for a given intensity measure type
"""
self.assertIsInstance(dframe1, GeoDataFrame)
self.assertIsInstance(dframe2, GeoDataFrame)
np.testing.assert_array_almost_equal(
dframe1[imt].to_numpy(), dframe1[imt].to_numpy(), tol
)
for geom1, geom2 in zip(dframe1.geometry, dframe2.geometry):
self.assertTrue(geom1.equals_exact(geom2, tol))
def test_get_contours_double_peak(self):
# Tests the extraction of the contours for the case with multiple peaks (i.e. the
# geometry should correspond to a MultiLineString object)
results = np.zeros(self.site_model.shape, dtype=np.dtype([("PGA", np.float64)]))
results["PGA"] = np.log(np.fabs(self.dummy) + 0.001)
contours = self.shakemap.get_contours("PGA", results, levels=[0.5, 1.0, 1.5])
# Contours should now be a pandas geodataframe - compare against reference results
expected_filename = os.path.join(DATA_PATH, "contours_test_double_peak.geojson")
expected_dframe = GeoDataFrame.from_file(expected_filename, driver="GeoJSON")
self._compare_contour_geodataframes(contours, expected_dframe, "PGA")
def test_get_contours_single_peak(self):
# Tests the extraction of the contours for the case with multiple peaks (i.e. the
# geometry should correspond to a LineString object)
results = np.zeros(self.site_model.shape, dtype=np.dtype([("PGA", np.float64)]))
results["PGA"] = np.log(np.clip(self.dummy + 0.001, 0.001, np.inf))
contours = self.shakemap.get_contours("PGA", results, levels=[0.5, 1.0, 1.5])
# Contours should now be a pandas geodataframe - compare against reference results
expected_filename = os.path.join(DATA_PATH, "contours_test_single_peak.geojson")
expected_dframe = GeoDataFrame.from_file(expected_filename, driver="GeoJSON")
self._compare_contour_geodataframes(contours, expected_dframe, "PGA")
def test_export_contours_geojson(self):
# Redo double peak test but now exporting results to geojson and reloading
results = np.zeros(self.site_model.shape, dtype=np.dtype([("PGA", np.float64)]))
results["PGA"] = np.log(np.fabs(self.dummy) + 0.001)
contours = self.shakemap.get_contours("PGA", results, levels=[0.5, 1.0, 1.5])
# Run the same function but export to geojson
test_file = os.path.join(DATA_PATH, "tmp_contour_file.geojson")
self.shakemap.contours_to_file(results, "PGA", test_file, levels=[0.5, 1.0, 1.5])
# Reload from geojson and make sure the dataframes are the same
expected_dframe = GeoDataFrame.from_file(test_file, driver="GeoJSON")
self._compare_contour_geodataframes(
contours,
expected_dframe,
"PGA",
)
os.remove(test_file)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment