diff --git a/Makefile b/Makefile index f2d19380117b4d9034209b4f74d97bc353931def..cf5fbc59eae5c96501be02be95fe807cd6b2d580 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ LENGTH=96 check: $(SOURCES) flake8 --max-line-length=$(LENGTH) $^ black --check --line-length $(LENGTH) $^ - pylint -E $^ + pylint -E --extension-pkg-whitelist='pydantic' $^ black: $(SOURCES) black --line-length $(LENGTH) $^ diff --git a/examples/api-use/api-example.py b/examples/api-use/api-example.py index 28fd550bbf858d07cd34ec5c2cf8f4863e2d2873..0a42ecf2ea91ba4b7656ff372d52cb0b0677c241 100644 --- a/examples/api-use/api-example.py +++ b/examples/api-use/api-example.py @@ -3,7 +3,10 @@ import json def main(): - # Option to combine scenario creation/run + # Option to combine scenario creation/run. The option to combine the `create_scenario` + # and `run_scenario` is purely convenience for the end-user. If a scenario is already + # created, but another loss-calculator run should be requested, this variable should be + # `False`. combine_create_and_run = False # General params @@ -25,7 +28,9 @@ def main(): db_output_file = "mosbach.db" if combine_create_and_run: - # Create and run scenario + # Create and run scenario. Optionally a bounding box can be given as parameter to + # the `create_and_run_scenario` request, but if this is not given, the function uses + # the provided QuakeML file to create a bounding box. requests.post( url=base_url + "create_and_run_scenario", headers=headers, @@ -37,12 +42,21 @@ def main(): }, ) else: + bbox = { + "lon_min": 8.63706, + "lat_min": 49.34848, + "lon_max": 8.77422, + "lat_max": 49.47723, + } # Create scenario - r = requests.post( + requests.post( url=base_url + "create_scenario", headers=headers, - data=QuakeML, - params={"scenario": scenario, "description": "Test model of Mosbach"}, + params={ + "scenario": scenario, + "description": "Test model of Mosbach", + "boundingbox": bbox, + }, ) # Run scenario @@ -63,7 +77,7 @@ def main(): ) # Create damage assessment report - r = requests.post( + requests.post( url=base_url + "create_damage_assessment_report", params={"scenario": scenario, "assessment_source": manual_assessment_source}, files={"damage_report": open("damage_report.json", "rb")}, diff --git a/losscalculator/api.py b/losscalculator/api.py index d5fadf66a303ac8d2912b14e0c3db1ae327051ef..a4caac5923954f19c44fef25fa05dc860ab66639 100644 --- a/losscalculator/api.py +++ b/losscalculator/api.py @@ -11,6 +11,7 @@ from fastapi import FastAPI, Body, HTTPException, File from fastapi.responses import FileResponse from obspy.io.quakeml.core import Unpickler from exposurelib.database import ExposureDatabase +from pydantic import BaseModel from losscalculator.damage_calculator import damage_calculator from losscalculator.shakemapservice import ShakemapiService @@ -20,6 +21,18 @@ logger = logging.getLogger(__name__) QuakeMLUnpickler = Unpickler() +class BoundingBox(BaseModel): + """ + The `BoundingBox` class implements a `BaseModel` for a bounding box that is used by the + `create_scenario` POST request. + """ + + lon_min: float + lon_max: float + lat_min: float + lat_max: float + + class LosscalculatorApi(FastAPI): """ The loss-calculator API communicates with end-users and is able to run the loss-calculator, @@ -350,8 +363,8 @@ async def get_scenario_database(scenario: str): return FileResponse(scenario_database_file) -@app.post("/create_scenario") -async def create_scenario( +@app.post("/create_scenario_from_quakeml") +async def create_scenario_from_quakeml( scenario: str, description: str, exposure_model_config: str = "Europe", @@ -368,34 +381,45 @@ async def create_scenario( description (str): Description of the scenario. exposure_model_config (str): - Name identifier of the configuration that is used. - This configuration includes a taxonomy-mapping file - and a fragilities file. + Name identifier of the configuration that is used. This configuration includes a + taxonomy-mapping file and a fragilities file. event_quakeml (Body): The parsed QuakeML file of the earthquake scenario. """ - async def _create_clipped_exposure_database( - source_exposure_model_file, exposure_model_database_name, bbox - ): - # Connect to exposure db - source_exposure_model = ExposureDatabase(database_filepath=source_exposure_model_file) + event_quakeml_copy = copy.deepcopy(event_quakeml) + bbox = await get_quakeml_bbox(event_quakeml_copy.encode()) + await create_scenario( + scenario=scenario, + description=description, + boundingbox=bbox, + exposure_model_config=exposure_model_config, + ) + return {} - # Export with bounding box - source_exposure_model.export( - export_filepath=exposure_model_database_name, bounding_box=bbox - ) - # Connect to clipped database - clipped_exposure_model = ExposureDatabase( - database_filepath=exposure_model_database_name - ) - clipped_exposure_model.connect() +@app.post("/create_scenario") +async def create_scenario( + scenario: str, + description: str, + boundingbox: Union[BoundingBox, None] = None, + exposure_model_config: str = "Europe", +): + """ + Create scenario for the losscalculator API. Using a bounding box as input, the source + exposure database is clipped and saved to the internal file system as a SpatiaLite database. - # Add status column to table Entity for mission planning - sql_statement = "ALTER TABLE Entity ADD status INTEGER;" - clipped_exposure_model.cursor.execute(sql_statement) - clipped_exposure_model.connection.commit() + Args: + scenario (str): + Name of the scenario. + description (str): + Description of the scenario. + boundingbox (BoundingBox): + The extent of the scenario. + exposure_model_config (str): + Name identifier of the configuration that is used. This configuration includes a + taxonomy-mapping file and a fragilities file. + """ # Check if scenario already exists if app.scenario_exists(scenario): @@ -410,9 +434,9 @@ async def create_scenario( # Clip exposure model in the QuakeML file and copy to a new exposure database exposure_model_db_name = os.path.join(app.data_dir, f"{scenario}Exposure.db") - event_quakeml_copy = copy.deepcopy(event_quakeml) - bbox = await get_quakeml_bbox(event_quakeml_copy.encode()) - await _create_clipped_exposure_database(app.exposure_database, exposure_model_db_name, bbox) + await create_clipped_exposure_database( + app.exposure_database, exposure_model_db_name, boundingbox + ) # Create a new scenario in the config file app.add_scenario_to_config( @@ -424,6 +448,40 @@ async def create_scenario( return {} +async def create_clipped_exposure_database( + source_exposure_model_file: str, clipped_exposure_model_file: str, boundingbox: BoundingBox +): + """ + Creates a new exposure database on the server from the source exposure database, using a + bounding box as input. + + Args: + source_exposure_model_file (str): + Filepath to the source exposure model. + clipped_exposure_model_file (str): + Filepath to the created clipped exposure model. + boundingbox (BoundingBox): + Bounding box that is used as clipping mask. + """ + + # Connect to exposure db + source_exposure_model = ExposureDatabase(database_filepath=source_exposure_model_file) + + # Export with bounding box + source_exposure_model.export( + export_filepath=clipped_exposure_model_file, bounding_box=boundingbox + ) + + # Connect to clipped database + clipped_exposure_model = ExposureDatabase(database_filepath=clipped_exposure_model_file) + clipped_exposure_model.connect() + + # Add status column to table Entity for mission planning + sql_statement = "ALTER TABLE Entity ADD status INTEGER;" + clipped_exposure_model.cursor.execute(sql_statement) + clipped_exposure_model.connection.commit() + + @app.post("/run_scenario") async def run_scenario( scenario: str, @@ -480,6 +538,7 @@ async def run_scenario( @app.post("/create_and_run_scenario") async def create_and_run_scenario( event_quakeml: str = Body(...), + boundingbox: Union[BoundingBox, None] = None, scenario: Union[str, None] = None, scenario_description: Union[str, None] = None, exposure_model_config: str = "Europe", @@ -502,14 +561,21 @@ async def create_and_run_scenario( assessment_source (str): Assessment identifier (e.g. campaign name) """ - # Create scenario - await create_scenario( - event_quakeml=event_quakeml, - scenario=scenario, - description=scenario_description, - exposure_model_config=exposure_model_config, - ) + if boundingbox is None: + await create_scenario_from_quakeml( + event_quakeml=event_quakeml, + scenario=scenario, + description=scenario_description, + exposure_model_config=exposure_model_config, + ) + else: + await create_scenario( + scenario=scenario, + description=scenario_description, + exposure_model_config=exposure_model_config, + boundingbox=boundingbox, + ) # Run scenario return await run_scenario( @@ -694,12 +760,12 @@ async def get_quakeml_bbox(event_quakeml: str, margin: float = 3.0): """ lon, lat = await get_quakeml_origin(event_quakeml) - return { - "lon_min": lon - margin, - "lat_min": lat - margin, - "lon_max": lon + margin, - "lat_max": lat + margin, - } + return BoundingBox( + lon_min=lon - margin, + lat_min=lat - margin, + lon_max=lon + margin, + lat_max=lat + margin, + ) def command_line_interface():