Commit 7388bf24 authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Implemented feature to store information on aggregated sources

parent d83781d2
Pipeline #28843 passed with stage
in 2 minutes and 51 seconds
......@@ -12,3 +12,9 @@ database_built_up: # Database where built-up areas per quadtile are stored
username: username
password: password_of_username
sourceid: 1
database_gde_tiles: # Database where info on the GDE tiles is stored
host: host_name
dbname: database_name
port: port_number # Leave empty if a port number is not needed
username: username
password: password_of_username
......@@ -84,6 +84,83 @@ class AggregatedExposureModel(abc.ABC):
raise NotImplementedError
def write_source_to_database(self, db_cursor, db_table):
"""This function stores self.model_name, self.exposure_format to an SQL database table
named db_table. The cursor to the database is provided as db_cursor.
If an entry already exists in db_table with name = self.model_name, the format field is
updated as per self.exposure_format and the aggregated_source_id remains the same. If an
entry does not already exist, it is created. It is assumed that the database table can
automatically assign sequential values of aggregated_source_id. The assigned value is
returned by this function.
Args:
db_cursor (psycopg2.extensions.cursor):
Cursor object that allows to execute commands on a target SQL database.
db_table (str):
Name of the table within the SQL database (associated with db_cursor), in which
information on the AggregatedExposureModel will be stored. This table is assumed
to contain, at least, the following fields:
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
name (str):
Name of the source of the aggregated exposure model.
format (str):
Format of the source files.
Returns:
aggregated_source_id (int):
ID of the aggregated source in db_table.
writing_mode (str):
"Inserted", if an entry did not already exist in db_table with name =
self.model_name and was created; "Updated" if an entry already existed in
db_table with name = self.model_name and then only the format was updated;
"Error" if more than one entries already existed in db_table with name =
self.model_name and thus nothing was done.
"""
sql_commands = {}
sql_commands["insert"] = "INSERT INTO %s(name,format) VALUES('%s','%s');" % (
db_table,
self.model_name,
self.exposure_format,
)
sql_commands["update"] = "UPDATE %s SET format='%s' WHERE name='%s';" % (
db_table,
self.exposure_format,
self.model_name,
)
sql_commands["query"] = "SELECT aggregated_source_id FROM %s WHERE name='%s';" % (
db_table,
self.model_name,
)
# Check if an entry already exists for a source with this name
db_cursor.execute(sql_commands["query"])
exec_result = db_cursor.fetchall()
if len(exec_result) == 0: # No entry with this name exists yet --> append
db_cursor.execute(sql_commands["insert"])
# Retrieve the source ID (assigned automatically by SQL)
db_cursor.execute(sql_commands["query"])
exec_result = db_cursor.fetchall()
aggregated_source_id = exec_result[0][0]
writing_mode = "Inserted"
elif len(exec_result) == 1: # Entry exists --> overwrite
db_cursor.execute(sql_commands["update"])
# Retrieve the source ID
aggregated_source_id = exec_result[0][0]
writing_mode = "Updated"
else: # More than one entries found, this is an error
logger.error(
"ERROR IN write_source_to_database: "
"MORE THAN ONE ENTRY FOUND FOR SOURCE NAME %s" % (self.model_name)
)
aggregated_source_id = numpy.nan
writing_mode = "Error"
return aggregated_source_id, writing_mode
class ExposureModelESRM20(AggregatedExposureModel):
"""This class represents the European Seismic Risk Model 2020 (ESRM20) aggregated exposure
......
......@@ -60,6 +60,20 @@ class Configuration:
Password associated with self.username.
sourceid (int):
ID of the built-up area source dataset that will be sought for.
self.database_gde_tiles (dict):
Dictionary containing the credentials needed to connect to the SQL database in which
information on the GDE tiles is stored. The exact parameters needed depend on the
database. They can be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
"""
REQUIRES = [
......@@ -69,6 +83,7 @@ class Configuration:
"exposure_entities_to_run",
"number_cores",
"database_built_up",
"database_gde_tiles",
]
def __init__(self, filepath, force_config_over_hierarchies=False):
......@@ -99,6 +114,9 @@ class Configuration:
self.database_built_up = self._retrieve_database_credentials(
config, "database_built_up", "test_db_built_up.env", force_config_over_hierarchies
)
self.database_gde_tiles = self._retrieve_database_credentials(
config, "database_gde_tiles", "test_db_gde_tiles.env", force_config_over_hierarchies
)
# Terminate if critical parameters are missing (not all parameters are critical)
for key_parameter in self.REQUIRES:
......
......@@ -20,6 +20,8 @@ import logging
import sys
from gdeimporter.configuration import Configuration
from gdeimporter.aggregatedexposuremodel import ExposureModelESRM20
from gdeimporter.tools.database import Database
# Add a logger printing error, warning, info and debug messages to the screen
logger = logging.getLogger()
......@@ -47,6 +49,15 @@ def main():
if len(aem.exposure_entities) < 1:
raise ValueError("no exposure entities found in %s" % (aem.model_name))
# Store information on the source of the aggregated exposure model
db_gde_tiles = Database(**config.database_gde_tiles)
db_gde_tiles.create_connection_and_cursor()
aem_source_id, aem_writing_mode = aem.write_source_to_database(
db_gde_tiles.cursor, "aggregated_sources"
)
logger.info("Source ID %s: %s" % (aem_source_id, aem_writing_mode))
db_gde_tiles.close_connection()
# Interpret and update config.exposure_entities_to_run
config.interpret_exposure_entities_to_run(aem)
......
......@@ -26,16 +26,19 @@ load_dotenv(Path(".env").resolve())
@pytest.fixture
def built_up_area_test_db():
"""A test database simulating to be obm_built_area_assessments."""
def test_db():
"""A test database simulating to contain the following tables:
- obm_built_area_assessments (of the OBM Tiles database)
- aggregated_sources (of the GDE Tiles database).
"""
init_built_up_db()
init_test_db()
return
def init_built_up_db():
"""Populates the test database that simulates to be obm_built_area_assessments with a basic
schema and data.
def init_test_db():
"""Populates the test database that simulates to contain obm_built_area_assessments and
aggregated_sources with a basic schema and data.
"""
if "GDEIMPORTER_DB_HOST" in os.environ: # When running the CI pipeline
......@@ -51,10 +54,16 @@ def init_built_up_db():
db = Database(**db_built_up_config)
db.create_connection_and_cursor()
# Create columns and populate the test database
# Create columns and populate the obm_built_area_assessments table
with open("tests/data/test_database_built_up.sql", "r") as file:
for command in file.read().split(";"):
if command != "\n":
db.cursor.execute(command)
# Create columns and populate the aggregated_sources table
with open("tests/data/test_database_aggregated_sources.sql", "r") as file:
for command in file.read().split(";"):
if command != "\n":
db.cursor.execute(command)
db.close_connection()
......@@ -12,3 +12,9 @@ database_built_up:
username: xxx
password: xxx
sourceid: x
database_gde_tiles:
host: xxx
dbname: xxx
port: 1111
username: xxx
password: xxx
......@@ -10,3 +10,8 @@ database_built_up:
dbname: some_database_name
username: some_username
password: some_password
database_gde_tiles:
host: host.somewhere.xx
dbname: some_database_name
username: some_username
password: some_password
......@@ -10,3 +10,8 @@ database_built_up:
dbname: some_database_name
username: some_username
password: some_password
database_gde_tiles:
host: host.somewhere.xx
dbname: some_database_name
username: some_username
password: some_password
......@@ -10,3 +10,8 @@ database_built_up:
dbname: some_database_name
username: some_username
password: some_password
database_gde_tiles:
host: host.somewhere.xx
dbname: some_database_name
username: some_username
password: some_password
DROP TABLE IF EXISTS aggregated_sources;
CREATE TABLE aggregated_sources
(
aggregated_source_id SERIAL PRIMARY KEY,
name varchar,
format varchar
);
INSERT INTO aggregated_sources(name, format)
VALUES ('first_source', 'aaa'),
('second_source', 'bbb'),
('third_source', 'ccc'),
('first_source', 'ddd');
......@@ -21,7 +21,8 @@ import pytest
import pandas
import numpy
from gdeimporter.configuration import Configuration
from gdeimporter.aggregatedexposuremodel import ExposureModelESRM20
from gdeimporter.aggregatedexposuremodel import AggregatedExposureModel, ExposureModelESRM20
from gdeimporter.tools.database import Database
def test_ExposureModelESRM20():
......@@ -249,4 +250,73 @@ def test_ExposureModelESRM20():
returned_aem.get_data_units(config, "Entity_3", "residential")
assert "FileNotFoundError" in str(excinfo.type)
# TO DO: test that the geometries are retrieved and stored correctly
def test_AggregatedExposureModel_write_source_to_database(test_db):
# Database connection (the Configuration class will define the credentials based on whether
# the code is running in the CI or locally)
config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_good.yml")
)
db_test = Database(**config.database_gde_tiles)
db_test.create_connection_and_cursor()
# Test case in which the name to add does not already exist in aggregated_sources
returned_aem = AggregatedExposureModel(config)
returned_source_id, returned_writing_mode = returned_aem.write_source_to_database(
db_test.cursor, "aggregated_sources"
)
assert returned_source_id == 5
assert returned_writing_mode == "Inserted"
returned_name, returned_format = query_aggregated_sources(db_test.cursor, 5)
assert returned_name == "esrm20"
assert returned_format == "esrm20"
# Test case in which the name to add already exists in aggregated_sources:
config.model_name = "second_source"
config.exposure_format = "vvv"
returned_aem = AggregatedExposureModel(config)
returned_source_id, returned_writing_mode = returned_aem.write_source_to_database(
db_test.cursor, "aggregated_sources"
)
assert returned_source_id == 2
assert returned_writing_mode == "Updated"
returned_name, returned_format = query_aggregated_sources(db_test.cursor, 2)
assert returned_name == "second_source"
assert returned_format == "vvv"
# Test case in which the name to add already exists twice in aggregated_sources:
config.model_name = "first_source"
config.exposure_format = "xxx"
returned_aem = AggregatedExposureModel(config)
returned_source_id, returned_writing_mode = returned_aem.write_source_to_database(
db_test.cursor, "aggregated_sources"
)
assert numpy.isnan(returned_source_id)
assert returned_writing_mode == "Error"
db_test.close_connection()
def query_aggregated_sources(db_cursor, source_id):
"""This auxiliary function queries the test database through an open cursor (db_cursor) to
retrieve the contents of the fields name and format from the table aggregated_source_id,
searching by aggregated_source_id = source_id.
"""
sql_command = (
"SELECT name, format FROM aggregated_sources WHERE aggregated_source_id='%s';"
% (str(source_id),)
)
db_cursor.execute(sql_command)
exec_result = db_cursor.fetchall()
return exec_result[0][0], exec_result[0][1]
......@@ -40,6 +40,11 @@ def test_Configuration():
assert returned_config.database_built_up["dbname"] == "some_database_name"
assert returned_config.database_built_up["username"] == "some_username"
assert returned_config.database_built_up["password"] == "some_password"
assert len(returned_config.database_gde_tiles.keys()) == 4
assert returned_config.database_gde_tiles["host"] == "host.somewhere.xx"
assert returned_config.database_gde_tiles["dbname"] == "some_database_name"
assert returned_config.database_gde_tiles["username"] == "some_username"
assert returned_config.database_gde_tiles["password"] == "some_password"
# Test case in which the file is not found
with pytest.raises(OSError) as excinfo:
......
......@@ -24,15 +24,11 @@ import pandas
import shapely
import pytest
from copy import deepcopy
from dotenv import load_dotenv
from pathlib import Path
from gdeimporter.tools.data_unit_tiles import DataUnitTilesHelper
from gdeimporter.configuration import Configuration
logger = logging.getLogger()
load_dotenv(Path(".env").resolve())
def test_get_quadtiles_in_bbox_of_polygon():
# Read shapefile with input polygons to test
......@@ -252,7 +248,7 @@ def test_get_data_unit_tiles():
)
def test_define_data_unit_tiles_and_attributes(built_up_area_test_db):
def test_define_data_unit_tiles_and_attributes(test_db):
# Read shapefile with input polygon to test
input_polygons = geopandas.read_file(
os.path.join(
......@@ -396,7 +392,7 @@ def test_get_area_albers_equal_area():
assert round(returned_area, 0) == round(expected_area, 0)
def test_retrieve_built_up_area(built_up_area_test_db):
def test_retrieve_built_up_area(test_db):
# Quadkeys whos built-up area will be retrieved
quadkeys = numpy.array(
[
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment