""" Implements the core shakemap class """ import io import h5py import numpy as np import warnings import rasterio import geopandas as gpd import matplotlib.pyplot as plt from typing import Dict, Optional, Tuple, List, Union from shapely.geometry import LineString, MultiLineString from openquake.hazardlib import const, imt from openquake.hazardlib.contexts import ContextMaker from shakyground2 import valid from shakyground2.earthquake import Earthquake from shakyground2.site_model import SiteModel from shakyground2.synthetic_rupture_generator import FiniteRuptureSampler class Shakemap(object): """ The core class for calculating shakemaps. The general workflow contained within takes the input earthquake and site model in order to build the source, path and site context objects (i.e. containers that hold the data needed for the ground motion models). The contexts objects are built upon instantiation of the shakemap class to avoid recalculation if further shakemaps are needed. The Shakemap class also caches the information into an hdf5 binary object, if desired by the user. This means that the shakemap information for an earthquake ccould be retrieved at a later time without the need for recalculation. """ def __init__( self, earthquake: Earthquake, site_model: SiteModel, ground_motion_model: List, tectonic_region: str, cache_file: Optional[str] = None, num_rupture_samples: int = 100, rdim: float = 0.0, synth_dist_weights: List = [0.25, 0.25, 0.25, 0.25], synthetic_rupture_max_site_distance: float = 200.0, synthetic_rupture_site_spacing: float = 0.05, ): """ Args: earthquake: Earthquake as an instance of the :class:shakyground2.earthquake.Earthquake site_model: Target sites used for calculation of the ground motion values, as instances of :class:shakyground2.site_model.SiteModel ground_motion_model: Set of ground motion models and their respective weights as a list of tuples of (ID, GMM, Weight) where ID is the unique ID of the GMM and its associated weight tectonic_region: Tectonic region to which the earthquake is assigned cache_file: Path to the hdf5 file for caching the results. It does not need to exist but if the hdf5 file exists and contains an instance of the same event (according to the earthquake.id) then the user will be warned here and the existing shakemaps for that earthquake ID subsequently overwitten if the user persists to implement the shakemap. num_rupture_samples: In the likely case that no rupture surface is available for the earthquake, one will be generated by the :class:`shakyground2.synthetic_rupture_generator. FiniteRuptureSampler`. This controls the number of samples to be used rdim: Exponent of the distance-dependent site weighing adopted by the :class: `shakyground2.synthetic_rupture_generator.FiniteRuptureSampler` synth_dist_weights: Can adjust the weight assigned to the different distance metrics used by the FiniteRuptureSampler synthetic_rupture max_site_distance: In the case that the target sites for the FiniteRuptureSampler need to be built from the bounding box, this defines the maximum distances of the target sites to define the bounding box synthetic_rupture_site_spacing: In the case that the target sites for the FiniteRuptureSampler need to be built from the bounding box, this defines the site spacing of the sites """ self.earthquake = earthquake self.site_model = site_model self.tectonic_region = tectonic_region self.ground_motion_model = ground_motion_model self.num_rupture_samples = num_rupture_samples self.rdim = rdim self.synthetic_rupture_distance_weights = synth_dist_weights if cache_file: self.caching = True self.cache_file = cache_file # Check if the earthquake is cleady in the case and warn the user if so fle = h5py.File(self.cache_file, "a") if earthquake.id in list(fle): warnings.warn( "Earthquake %s already in cache file %s - " "Running the shakemaps will overwrite this" % (self.earthquake.id, self.cache_file) ) fle.close() else: self.caching = False self.cache_file = None self.rctx = None self.dctx = None self.sctx = None self.synth_rupture_max_site_dist = valid.positive_float( synthetic_rupture_max_site_distance, "Max. Synthetic Rupture Site Distance" ) self.synth_rupture_site_spacing = valid.positive_float( synthetic_rupture_site_spacing, "Synthetic Rupture Site Spacing" ) self._build_contexts() def _build_contexts(self): """ Construct the rupture, site and distances contexts from the earthquake, site and ground motion models """ cmaker = ContextMaker( self.tectonic_region, [gmm[1] for gmm in self.ground_motion_model] ) if not self.earthquake.rupture: # Use the finite rupture sampler to generate the rupture and corresponding # distances from the available information self.earthquake._rupture = FiniteRuptureSampler().get_finite_rupture( self.num_rupture_samples, self.earthquake, rdim=self.rdim, weights=self.synthetic_rupture_distance_weights, maximum_site_distance=self.synth_rupture_max_site_dist, site_spacing=self.synth_rupture_site_spacing, )[0] # For the sites and rupture context we can use the context maker to get all of the # source, site and rupture distances self.rctx, self.sctx, self.dctx = cmaker.make_contexts( self.site_model.get_site_collection(), self.earthquake.rupture ) def _cache_contexts(self, grp: h5py.Group): """ If caching the shakemaps, then this stores the context information to the file """ # Add the contexts to the group object ctxt = grp.create_group("contexts") rup_ctxt = ctxt.create_group("rupture") dist_ctxt = ctxt.create_group("distances") for gmm in [gmm[1] for gmm in self.ground_motion_model]: for rup_attr in gmm.REQUIRES_RUPTURE_PARAMETERS: if rup_attr not in rup_ctxt.attrs: rup_ctxt.attrs[rup_attr] = getattr(self.rctx, rup_attr) for attr in gmm.REQUIRES_DISTANCES: if attr not in list(dist_ctxt): distance = getattr(self.dctx, attr) dist_dset = dist_ctxt.create_dataset(attr, distance.shape, dtype="f") dist_dset[:] = distance site_ctxt = ctxt.create_dataset( "sites", self.site_model.site_array.shape, dtype=self.site_model.site_array.dtype, ) site_ctxt[:] = self.site_model.site_array if self.site_model.bbox_properties: # If the site model has bounding box properties then store these # as attributes site_ctxt.attrs["has_bbox"] = True site_ctxt.attrs["llon"] = self.site_model.bbox_properties["bbox"][0] site_ctxt.attrs["llat"] = self.site_model.bbox_properties["bbox"][1] site_ctxt.attrs["ulon"] = self.site_model.bbox_properties["bbox"][2] site_ctxt.attrs["ulat"] = self.site_model.bbox_properties["bbox"][3] site_ctxt.attrs["spcx"] = self.site_model.bbox_properties["spcx"] site_ctxt.attrs["spcy"] = self.site_model.bbox_properties["spcy"] site_ctxt.attrs["ncol"] = self.site_model.bbox_properties["ncol"] site_ctxt.attrs["nrow"] = self.site_model.bbox_properties["nrow"] else: site_ctxt.attrs["has_bbox"] = False return def get_shakemap( self, intensity_measure_types: List ) -> Tuple[np.ndarray, np.ndarray, Dict]: """ Main function to constructs the shakemaps for the specified intensity measures, caching the information to hdf5 if requested Args: intensity_measure_types: List of intensity measures for which the shakemaps will be calculated Returns: aggregated_means: The mean of the ground motions from the different ground motion models weighted by the assigned input weights aggregated_stddevs: The total standard deviation of the ground motions from the different ground motion models weighted by the assigned input weights shakemaps: Dictionary of individual shakemaps for each ground motion model """ shakemaps = {} shakemap_dtypes = np.dtype( [ (intensity_measure_type, np.float64) for intensity_measure_type in intensity_measure_types ] ) if self.caching: # If caching, open (or create) the file and cache the contexts fle = h5py.File(self.cache_file, "r+") if self.earthquake.id in list(fle): del fle[self.earthquake.id] fle_eq = fle.create_group(self.earthquake.id) self._cache_contexts(fle_eq) # Pre-allocate the aggregated shakemaps with zeros aggregated_means = np.zeros(self.site_model.shape, dtype=shakemap_dtypes) aggregated_stddevs = np.zeros(self.site_model.shape, dtype=shakemap_dtypes) for gmm_id, gmm, weight in self.ground_motion_model: shakemaps[gmm_id] = { "weight": weight, "mean": np.zeros(self.site_model.shape, dtype=shakemap_dtypes), "stddev": np.zeros(self.site_model.shape, dtype=shakemap_dtypes), } for intensity_measure_type in intensity_measure_types: input_imt = imt.from_string(intensity_measure_type) try: mean, [stddev] = gmm.get_mean_and_stddevs( self.sctx, self.rctx, self.dctx, input_imt, [const.StdDev.TOTAL] ) except KeyError: warnings.warn( "Ground motion model %s not defined for intensity " "measure type %s" % (str(gmm), intensity_measure_type) ) continue aggregated_means[intensity_measure_type] += weight * mean aggregated_stddevs[intensity_measure_type] += weight * stddev shakemaps[gmm_id]["mean"][intensity_measure_type] = mean shakemaps[gmm_id]["stddev"][intensity_measure_type] = stddev if self.caching: self._cache_shakemap( fle_eq, shakemaps, aggregated_means, aggregated_stddevs, shakemap_dtypes ) fle.close() return aggregated_means, aggregated_stddevs, shakemaps def _cache_shakemap( self, fle: h5py.Group, shakemaps: Dict, aggregated_means: np.ndarray, aggregated_stddevs: np.ndarray, shakemap_dtypes: np.dtype, ): """ If caching is required then the shakemaps are written to the hdf5 file Args: fle: HDF5 group object to store the shakemaps shakemaps: Dictionary of individual shakemaps for each ground motion model aggregated_means: The mean of the ground motions from the different ground motion models weighted by the assigned input weights aggregated_stddevs: The total standard deviation of the ground motions from the different ground motion models weighted by the assigned input weights shakemap_dtypes: IMT-dependent data type of the shakemaps """ shakemap_grp = fle.create_group("shakemaps") for gmm_id in shakemaps: gmm_grp = shakemap_grp.create_group(gmm_id) gmm_grp.attrs["weight"] = shakemaps[gmm_id]["weight"] mean_dset = gmm_grp.create_dataset( "mean", shakemaps[gmm_id]["mean"].shape, dtype=shakemap_dtypes ) mean_dset[:] = shakemaps[gmm_id]["mean"] stddev_dset = gmm_grp.create_dataset( "stddev", shakemaps[gmm_id]["stddev"].shape, dtype=shakemap_dtypes ) stddev_dset[:] = shakemaps[gmm_id]["stddev"] # Store the agregated results aggregated_grp = fle.create_group("aggregated") aggregated_mean_dset = aggregated_grp.create_dataset( "mean", aggregated_means.shape, shakemap_dtypes ) aggregated_mean_dset[:] = aggregated_means aggregated_stddev_dset = aggregated_grp.create_dataset( "stddev", aggregated_stddevs.shape, shakemap_dtypes ) aggregated_stddev_dset[:] = aggregated_stddevs def _transform_to_raster( self, imt: str, shakemap: np.ndarray, is_stddev: bool = False ) -> np.ndarray: """ Transforms a shakemap output into a 2D array for raster export based on the bounding box properties in the site model Args: imt: Intensity measure type shakemap: shakemap outputs from `get_shakemap(...)` is_stddev: shakemap values are standard deviations (True) or expected ground motion Returns: shakemap as 2D numpy array """ if not self.site_model.bbox_properties: raise ValueError( "No bounding bbox properties found in SiteModel - " "cannot export to raster format" ) if is_stddev: # If the input is a standard deviation then no transformation of the data results = np.copy(shakemap[imt]) else: # For absolute ground motion take the natural exponent results = np.exp(shakemap[imt]) assert ( results.shape[0] == self.site_model.shape ), "Shakemap dimensions do not correspond to site model" # Reshape and export to raster shakemap_shape = [ self.site_model.bbox_properties["nrow"], self.site_model.bbox_properties["ncol"], ] return results.reshape(shakemap_shape) def to_geotiff( self, shakemap: np.ndarray, imt: str, filename: Optional[str] = None, is_stddev: bool = False, ) -> Union[io.BytesIO, None]: """ Exports the shakemap for a given intensity measure type to GeoTiff format Args: shakemap: shakemap outputs from `get_shakemap(...)` imt: Intensity measure type filename: Path to geotiff file for export. If None then returns the geotiff as a bytes object is_stddev: shakemap values are standard deviations (True) or expected ground motion """ results = self._transform_to_raster(imt, shakemap, is_stddev) spcx = self.site_model.bbox_properties["spcx"] spcy = self.site_model.bbox_properties["spcy"] # Build the transformation object (holds the geospatial information about the raster) transform = ( rasterio.transform.Affine.translation( self.site_model.bbox_properties["bbox"][0] - (spcx / 2.0), self.site_model.bbox_properties["bbox"][1] - (spcy / 2.0), ) * rasterio.transform.Affine.scale(spcx, spcy) ) # Export to file if filename: with rasterio.open( filename, "w", "GTiff", height=results.shape[0], width=results.shape[1], count=1, dtype=results.dtype, crs="+proj=latlong", transform=transform, compress="lzw", ) as dst: dst.write(results, 1) return # Returns a bytes object with rasterio.MemoryFile(ext=".tif") as memfile: with memfile.open( driver="GTiff", height=results.shape[0], width=results.shape[1], count=1, dtype=results.dtype, crs="+proj=latlong", transform=transform, compress="lzw", ) as dst: dst.write(results, 1) buffer = memfile.getbuffer() io_buffer = io.BytesIO(buffer) as_bytes = io_buffer.read() return as_bytes def to_esri_ascii( self, shakemap: np.ndarray, imt: str, filename: str, is_stddev: bool = False, nodata: Optional[float] = None, fmt: str = "%.4e", ): """ Exports the shakemap for a given intensity measure type to ESRI ascii format Args: shakemap: shakemap outputs from `get_shakemap(...)` imt: Intensity measure type filename: Path to ESRI Ascii file for export is_stddev: shakemap values are standard deviations (True) or expected ground motion """ if not np.isclose( self.site_model.bbox_properties["spcx"], self.site_model.bbox_properties["spcy"] ): # ESRI Grid files cannot accommodate this raise IOError( "ESRI ascii files cannot accommodate unequal spacing " "in the X and Y dimension - try GeoTIFF instead" ) results = self._transform_to_raster(imt, shakemap, is_stddev) # Export the raster to file with open(filename, "w") as f: # Write header information f.write("NCOLS %g\n" % self.site_model.bbox_properties["ncol"]) f.write("NROWS %g\n" % self.site_model.bbox_properties["nrow"]) f.write("XLLCENTER %.8f\n" % self.site_model.bbox_properties["bbox"][0]) f.write("YLLCENTER %.8f\n" % self.site_model.bbox_properties["bbox"][1]) f.write("CELLSIZE %.8e\n" % self.site_model.bbox_properties["spcx"]) if nodata is not None: f.write("NODATA_VALUE %.8f\n" % nodata) # Write the data np.savetxt(f, results, fmt=fmt) return def get_contours( self, imt: str, shakemap: np.ndarray, levels: Union[int, np.ndarray] = 10, is_stddev: bool = False, ) -> gpd.GeoDataFrame: """ For a shakemap of a given intensity measure type, retrieves the set of contours and exports to a geopandas GeoDataframe Args: imt: Intensity measure type shakemap: shakemap outputs from `get_shakemap(...)` levels: Number of levels for the contours or pre-defined set of levels (see documentation for matplotlib.pyplot.contour) is_stddev: shakemap values are standard deviations (True) or expected ground motion Returns: Contour set as geopandas GeoDataframe """ # For contouring we need the shakemap in 2D results = self._transform_to_raster(imt, shakemap, is_stddev) new_shape = [ self.site_model.bbox_properties["nrow"], self.site_model.bbox_properties["ncol"], ] lons = self.site_model["lon"].reshape(new_shape) lats = self.site_model["lat"].reshape(new_shape) # Surpress plotting plt.ioff() fig = plt.figure() ax = fig.add_subplot(111) # Get the contours contours = ax.contour(lons, lats, results, levels) contour_levels = [] contour_geometries = [] # Retrieve the x, y coordinates of each contour for level, segs in zip(contours.levels, contours.allsegs): if not len(segs): continue else: geometries = [] for seg in segs: geometries.append(LineString([(row[0], row[1]) for row in seg])) if len(geometries) > 1: # Has multiple contour lines for this level contour_geometries.append(MultiLineString(geometries)) else: # Only a single contour for this level contour_geometries.append(geometries[0]) contour_levels.append(level) plt.close(fig) # Re-allow plotting plt.ion() # Create geodataframe dframe = gpd.GeoDataFrame( {imt: contour_levels, "geometry": gpd.GeoSeries(contour_geometries)} ) dframe.crs = {"init": "epsg:4326"} return dframe def contours_to_file( self, shakemap: np.ndarray, imt: str, filename: str, levels: Union[int, np.ndarray] = 10, is_stddev: bool = False, driver: str = "GeoJSON", ): """ Exports the contours of a shapefile to a vector spatial format Args: imt: Intensity measure type shakemap: shakemap outputs from `get_shakemap(...)` levels: Number of levels for the contours or pre-defined set of levels (see documentation for matplotlib.pyplot.contour) is_stddev: shakemap values are standard deviations (True) or expected ground motion driver: Any Vector spatial file driver supported by Fiona https://github.com/Toblerity/Fiona/blob/master/fiona/drvsupport.py """ contours = self.get_contours(imt, shakemap, levels, is_stddev) contours.to_file(filename, driver=driver) return