Commit b22b3bc8 authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Added feature to retrieve ID of aggregated source name in config file

parent 592fd663
Pipeline #40455 passed with stage
in 1 minute and 41 seconds
image: python:3.9-bullseye
services:
- name: $CI_REGISTRY/dynamicexposure/server-components/containers/docker-obm-database:master
alias: postgres
# Make pip cache the installed dependencies
variables:
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
POSTGRES_DB: testdatabase
POSTGRES_USER: tester
POSTGRES_PASSWORD: somepass
GDEIMPORTER_DB_HOST: postgres
GDEIMPORTER_DB: ${POSTGRES_DB}
GDEIMPORTER_USER: ${POSTGRES_USER}
GDEIMPORTER_PASSWORD: ${POSTGRES_PASSWORD}
cache:
paths:
- .cache/pip
......
model_name: esrm20 # Needs to exist in 'aggregated_sources' database table
database_gde_tiles: # Database where info on the GDE tiles is stored
host: host_name
dbname: database_name
port: port_number # Leave empty if a port number is not needed
username: username
password: password_of_username
#!/usr/bin/env python3
# Copyright (C) 2021:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import logging
from gdeimporter.tools.configuration_methods import ConfigurationMethods
logger = logging.getLogger()
class Configuration:
"""This class handles the configuration parameters of the gde-core.
Attributes:
self.model_name (str):
Name of the input aggregated model.
self.database_gde_tiles (dict):
Dictionary containing the credentials needed to connect to the SQL database in which
information on the GDE tiles is stored. The exact parameters needed depend on the
database. They can be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
"""
REQUIRES = [
"model_name",
"database_gde_tiles",
]
def __init__(self, filepath, force_config_over_hierarchies=False):
"""
Args:
filepath (str):
Full file path to the .yml configuration file.
force_config_over_hierarchies (bool):
If True, the contents of the .yml configuration file specified in filepath will
take precedence over any other hierarchy (e.g. preference of environment
variables if they exist). If False, hierarchies of preference established in
this class are applied. This parameter is used for forcing the testing of this
class under certain circumstances. Default: False.
"""
config = ConfigurationMethods.read_config_file(filepath)
self.model_name = ConfigurationMethods.assign_parameter(config, "model_name")
self.database_gde_tiles = ConfigurationMethods.retrieve_database_credentials(
config, "database_gde_tiles", "test_db_gde_tiles.env", force_config_over_hierarchies
)
# Terminate if critical parameters are missing (not all parameters are critical)
for key_parameter in self.REQUIRES:
if getattr(self, key_parameter) is None:
error_message = (
"Error: parameter '%s' could not be retrieved from "
"configuration file. The program cannot run." % (key_parameter)
)
logger.critical(error_message)
raise OSError(error_message)
#!/usr/bin/env python3
# Copyright (C) 2022:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import logging
from gdeimporter.tools.database import Database
logger = logging.getLogger()
class DatabaseQueries:
"""This class contains methods used to query the OpenBuildingMap (OBM) and Global Dynamic
Exposure (GDE) databases.
"""
@staticmethod
def retrieve_aggregated_source_id(model_name, db_gde_tiles_config, db_table):
"""This function retrieves the ID of the aggregated exposure model source whose name is
'model_name'.
Args:
model_name (str):
Name of the source whose ID is to be retrieved.
db_gde_tiles_config (dict):
Dictionary containing the credentials needed to connect to the SQL database in
which information on the aggregated_sources is stored. The keys of the
dictionary need to be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
db_table (str):
Name of the table of the SQL database where the aggregated_sources are stored.
It is assumed that this table contains, at least, the following fields:
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
name (str):
Name of the source of the aggregated exposure model.
Returns:
aggregated_source_id (int):
ID of the source of the aggregated exposure model with name 'model_name'. If
'model_name' is not found, 'aggregated_source_id' is -999.
"""
sql_query = "SELECT aggregated_source_id FROM %s WHERE name='%s';"
db_gde_tiles = Database(**db_gde_tiles_config)
db_gde_tiles.create_connection_and_cursor()
db_gde_tiles.cursor.execute(sql_query % (db_table, model_name))
exec_result = db_gde_tiles.cursor.fetchall()
db_gde_tiles.close_connection()
if len(exec_result) == 1: # Entry exists --> retrieve
aggregated_source_id = exec_result[0][0]
else: # More than one entries found, this is an error
logger.error(
"Error in retrieve_aggregated_source_id: "
"more than one or no entry found for name = %s" % (model_name)
)
aggregated_source_id = -999
return aggregated_source_id
......@@ -18,6 +18,8 @@
import logging
import sys
from gdecore.configuration import Configuration
from gdecore.database_queries import DatabaseQueries
# Add a logger printing error, warning, info and debug messages to the screen
logger = logging.getLogger()
......@@ -26,10 +28,31 @@ logger.addHandler(logging.StreamHandler(sys.stdout))
def main():
"""Run the gde-core."""
# Log the start of the run
logger.info("gde-core has started")
# Read configuration parameters
config = Configuration("config.yml")
aggregated_source_id = DatabaseQueries.retrieve_aggregated_source_id(
config.model_name,
config.database_gde_tiles,
"aggregated_sources",
)
if aggregated_source_id < 0:
error_message = (
"Error while attempting to retrieve the ID of aggregated exposure model with name "
"'%s': more than one or no entries were found." % (config.model_name)
)
raise OSError(error_message)
logger.info(
"aggregated_source_id of aggregated exposure model with name '%s' retrieved: %s"
% (config.model_name, aggregated_source_id)
)
# Leave the program
logger.info("gde-core has finished")
sys.exit()
......
......@@ -28,7 +28,11 @@ setup(
keywords="Global Dynamic Exposure, GDE, buildings, exposure model",
author="Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ",
license="AGPLv3+",
install_requires=["numpy"],
install_requires=[
"numpy",
# pylint: disable=line-too-long
"gdeimporter@git+https://git.gfz-potsdam.de/dynamicexposure/globaldynamicexposure/gde-importer.git", # noqa: E501
],
extras_require={
"tests": tests_require,
"linters": linters_require,
......
#!/usr/bin/env python3
# Copyright (C) 2022:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import os
from pathlib import Path
import pytest
from dotenv import load_dotenv
from gdeimporter.tools.database import Database
load_dotenv(Path(".env").resolve())
@pytest.fixture
def test_db():
"""A test database simulating to contain the following tables:
- aggregated_sources (of the GDE Tiles database)
"""
init_test_db()
return
def init_test_db():
"""Populates the test database."""
if "GDEIMPORTER_DB_HOST" in os.environ: # When running the CI pipeline
db_built_up_config = {
"host": os.environ.get("GDEIMPORTER_DB_HOST"),
"dbname": os.environ.get("GDEIMPORTER_DB"),
"port": "",
"username": os.environ.get("GDEIMPORTER_USER"),
"password": os.environ.get("GDEIMPORTER_PASSWORD"),
}
# Create Database instance and establish the connection and cursor
db = Database(**db_built_up_config)
db.create_connection_and_cursor()
# Create columns and populate the tables
with open("tests/data/test_database_set_up.sql", "r") as file:
for command in file.read().split(";"):
if command != "\n":
db.cursor.execute(command)
db.close_connection()
model_name: esrm20
database_gde_tiles:
host: host.somewhere.xx
dbname: some_database_name
username: some_username
password: some_password
DROP TABLE IF EXISTS aggregated_sources;
CREATE TABLE aggregated_sources
(
aggregated_source_id SERIAL PRIMARY KEY,
name varchar,
format varchar
);
INSERT INTO aggregated_sources(name, format)
VALUES ('esrm20', 'esrm20'),
('second_source', 'bbb'),
('third_source', 'ccc'),
('first_source', 'ddd');
#!/usr/bin/env python3
# Copyright (C) 2022:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import os
import pytest
from gdecore.configuration import Configuration
def test_Configuration():
# Test the non-trivial case using a test data file
returned_config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_good.yml"),
force_config_over_hierarchies=True,
)
assert returned_config.model_name == "esrm20"
assert len(returned_config.database_gde_tiles.keys()) == 4
assert returned_config.database_gde_tiles["host"] == "host.somewhere.xx"
assert returned_config.database_gde_tiles["dbname"] == "some_database_name"
assert returned_config.database_gde_tiles["username"] == "some_username"
assert returned_config.database_gde_tiles["password"] == "some_password"
# Test case in which the file is not found
with pytest.raises(OSError) as excinfo:
returned_config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "doesnotexist.yml"),
force_config_over_hierarchies=True,
)
assert "OSError" in str(excinfo.type)
#!/usr/bin/env python3
# Copyright (C) 2022:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import os
from gdecore.configuration import Configuration
from gdecore.database_queries import DatabaseQueries
def test_retrieve_aggregated_source_id(test_db):
# Database connection (the Configuration class will define the credentials based on whether
# the code is running in the CI or locally)
config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_good.yml")
)
returned_aggregated_source_id = DatabaseQueries.retrieve_aggregated_source_id(
"second_source", config.database_gde_tiles, "aggregated_sources"
)
assert returned_aggregated_source_id == 2
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment