Commit ad4132d3 authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Added feature to read built-up areas from the OBM Tiles database

parent 6464af0f
Pipeline #28143 passed with stage
in 1 minute and 58 seconds
......@@ -6,6 +6,7 @@ Pipfile
Pipfile.lock
.idea
.coverage
*.env
__pycache__
.cache
......
image: python:3-buster
services:
- postgres:13.3
# Make pip cache the installed dependencies
variables:
PIP_CACHE_DIR: "$CI_PROJECT_DIR/.cache/pip"
POSTGRES_DB: testdatabase
POSTGRES_USER: tester
POSTGRES_PASSWORD: somepass
GDEIMPORTER_DB_HOST: postgres
GDEIMPORTER_DB: ${POSTGRES_DB}
GDEIMPORTER_USER: ${POSTGRES_USER}
GDEIMPORTER_PASSWORD: ${POSTGRES_PASSWORD}
GDEIMPORTER_SOURCEID: 1
cache:
paths:
- .cache/pip
......
......@@ -5,3 +5,10 @@ boundaries_pathname: path_to_directory_with_boundary_files
occupancies_to_run: residential, commercial # Need to exist for the indicated `exposure format`, industrial not supported
exposure_entities_to_run: all # Either "all", a comma-space-separated list of entity names, or a name of a .txt or .csv file
number_cores: 1 # Number of cores used for parallelisation
database_built_up: # Database where built-up areas per quadtile are stored
host: host_name
dbname: database_name
port: port_number # Leave empty if a port number is not needed
username: username
password: password_of_username
sourceid: 1
......@@ -16,8 +16,10 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import os
import logging
import yaml
from dotenv import load_dotenv
logger = logging.getLogger()
......@@ -42,6 +44,22 @@ class Configuration:
List of names of the exposure entities for which the data units will be retrieved.
self.number_cores (int):
Number of cores that will be used to run the code.
self.database_built_up (dict):
Dictionary containing the credentials needed to connect to the SQL database in which
the built-up area per quadtile is stored. The exact parameters needed depend on the
database. They can be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
sourceid (int):
ID of the built-up area source dataset that will be sought for.
"""
REQUIRES = [
......@@ -50,13 +68,20 @@ class Configuration:
"occupancies_to_run",
"exposure_entities_to_run",
"number_cores",
"database_built_up",
]
def __init__(self, filepath):
def __init__(self, filepath, force_config_over_hierarchies=False):
"""
Args:
filepath (str):
Full file path to the .yml configuration file.
force_config_over_hierarchies (bool):
If True, the contents of the .yml configuration file specified in filepath will
take precedence over any other hierarchy (e.g. preference of environment
variables if they exist). If False, hierarchies of preference established in
this class are applied. This parameter is used for forcing the testing of this
class under certain circumstances. Default: False.
"""
config = self.read_config_file(filepath)
......@@ -71,6 +96,9 @@ class Configuration:
config, "exposure_entities_to_run"
)
self.number_cores = self._assign_integer_parameter(config, "number_cores")
self.database_built_up = self._retrieve_database_credentials(
config, "database_built_up", "test_db_built_up.env", force_config_over_hierarchies
)
# Terminate if critical parameters are missing (not all parameters are critical)
for key_parameter in self.REQUIRES:
......@@ -242,3 +270,124 @@ class Configuration:
assigned_parameter = None
return assigned_parameter
def _assign_hierarchical_parameters(self, config, input_parameter, requested_nested=[]):
"""This function searches for the key input_parameter in the dictionary config, and for
each of the elements of requested_nested as keys of config[input_parameter].
If input_parameter is not a key of config, the output is None.
If input_parameter is a key of config, but one of the elements of requested_nested is
not a key of config[input_parameter], the output is None.
Args:
config (dictionary):
The configuration file read as a dictionary. It may be an empty dictionary.
input_parameter (str):
Name of the desired parameter, to be searched for as a primary key of config.
requested_nested (list of str):
List of the names of the desired nested parameters, to be searched for as keys
of config[input_parameter]. If empty, the function will retrieve all nested
parameters available in 'config'.
Returns:
assigned_parameter (dictionary or None):
The content of config[input_parameter], if input_parameter is a key of config
and all elements of requested_nested are keys of config[input_parameter], or
None otherwise.
"""
assigned_parameter = self._assign_parameter(config, input_parameter)
if assigned_parameter is None:
return None
if not isinstance(assigned_parameter, dict):
return None
if len(requested_nested) == 0:
requested_nested = list(assigned_parameter.keys())
sub_parameters_missing = False
for requested_parameter in requested_nested:
if requested_parameter not in assigned_parameter.keys():
logger.critical(
"ERROR instantiating Configuration: parameter '%s' does not "
"exist in %s" % (requested_parameter, input_parameter)
)
sub_parameters_missing = True
if sub_parameters_missing is True:
return None
return assigned_parameter
def _retrieve_database_credentials(
self, config, input_parameter, env_filename, force_config_over_hierarchies
):
"""This function retrieves the credentials needed to (later) connect to a specific SQL
database. If force_config_over_hierarchies is False, it does so hieararchically, by
giving top priority to environment variables that are created when running the CI
Pipeline, second priority to environment variables that are created locally if a .env
file with name env_filename is provided and, finally, by looking at what has been
indicated in the input configuration file (read as config).
When force_config_over_hierarchies is True, it does not matter where the code is
running, it will always retrieve the credentials from the configuration file.
Args:
config (dictionary):
The configuration file read as a dictionary. It may be an empty dictionary.
input_parameter (str):
Name of the desired parameter, to be searched for as a primary key of config.
env_filename (str):
Name of a local .env file that will be run to load environment variables.
force_config_over_hierarchies (bool):
If True, the contents of the .yml configuration file specified in filepath will
take precedence over any other hierarchy (e.g. preference of environment
variables if they exist). If False, hierarchies of preference established in
this class are applied. This parameter is used for forcing the testing of this
class under certain circumstances. Default: False.
Returns:
db_config (dict):
Dictionary containing the credentials needed to connect to the desired SQL
database. These are:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with username.
"""
db_config = self._assign_hierarchical_parameters(config, input_parameter)
if "port" in config:
db_config["port"] = int(db_config["port"])
if "GDEIMPORTER_DB_HOST" in os.environ and not (force_config_over_hierarchies):
# Running the CI pipeline
db_config = {
"host": os.environ.get("GDEIMPORTER_DB_HOST"),
"dbname": os.environ.get("GDEIMPORTER_DB"),
"username": os.environ.get("GDEIMPORTER_USER"),
"password": os.environ.get("GDEIMPORTER_PASSWORD"),
"sourceid": os.environ.get("GDEIMPORTER_SOURCEID"),
}
elif os.path.isfile(env_filename) and not (force_config_over_hierarchies):
# Testing locally with a test database
load_dotenv(env_filename)
db_config = {
"host": os.environ.get("GDEIMPORTER_LOCAL_DB_HOST"),
"dbname": os.environ.get("GDEIMPORTER_LOCAL_DB"),
"username": os.environ.get("GDEIMPORTER_LOCAL_USER"),
"password": os.environ.get("GDEIMPORTER_LOCAL_PASSWORD"),
"sourceid": os.environ.get("GDEIMPORTER_LOCAL_SOURCEID"),
}
return db_config
......@@ -18,6 +18,7 @@
import logging
from multiprocessing import Pool
from functools import partial
from gdeimporter.tools.data_unit_tiles import DataUnitTilesHelper
......@@ -80,7 +81,9 @@ class ExposureEntity:
self.name = name
self.occupancy_cases = {}
def create_data_unit_tiles(self, occupancy_case, number_cores):
def create_data_unit_tiles(
self, occupancy_case, number_cores, db_built_up_config, db_table
):
"""This function creates the data-unit tiles associated with all data units of the
ExposureEntity for a specified 'occupancy_case'. The latter needs to be a key of
self.occupancy_cases. The data units will be paralellised into as many cores as
......@@ -94,6 +97,30 @@ class ExposureEntity:
self.occupancy_cases.
number_cores (int):
Number of CPU cores to be used to run this function.
db_built_up_config (dict):
Dictionary containing the credentials needed to connect to the database where
the built-up area values are stored. The keys of the dictionary need to be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with username.
sourceid (int):
ID of the built-up area source dataset that will be sought for.
db_table (str):
Name of the table of the SQL database where the built-up area values are stored.
It is assumed that this table contains, at least, the following fields:
quadkey (str):
String indicating the quadkey of a tile.
built_area_size (float):
Value of the built-up area to be retrieved.
source_id (int):
ID of the source used to define the built-up area.
Returns:
This function writes the 'data_unit_tiles' attribute of the data units of the
......@@ -113,9 +140,12 @@ class ExposureEntity:
]
p = Pool(processes=number_cores)
all_data_unit_tiles = p.map(
DataUnitTilesHelper.define_data_unit_tiles_and_attributes, data_units_geoms
func = partial(
DataUnitTilesHelper.define_data_unit_tiles_and_attributes,
db_built_up_config,
db_table,
)
all_data_unit_tiles = p.map(func, data_units_geoms)
p.close()
p.join()
......
......@@ -55,7 +55,10 @@ def main():
for occupancy_case in config.occupancies_to_run:
aem.get_data_units(config, exposure_entity_name, occupancy_case)
aem.exposure_entities[exposure_entity_name].create_data_unit_tiles(
occupancy_case, config.number_cores
occupancy_case,
config.number_cores,
config.database_built_up,
"obm_built_area_assessments",
)
print("Name of the model: %s" % (aem.model_name))
......
......@@ -24,6 +24,7 @@ import pyproj
import mercantile
import shapely
from copy import deepcopy
from gdeimporter.tools.database import Database
logger = logging.getLogger()
......@@ -100,12 +101,36 @@ class DataUnitTilesHelper:
return data_unit_tiles, filtered_quadtiles
@staticmethod
def define_data_unit_tiles_and_attributes(in_geometry):
def define_data_unit_tiles_and_attributes(db_built_up_config, db_table, in_geometry):
"""This function defines the data-unit tiles associated with 'in_geometry' and their
respective attributes. Data-unit tiles are defined as the intersection between zoom
level 18 quadtiles and 'in_geometry'.
Args:
db_built_up_config (dict):
Dictionary containing the credentials needed to connect to the database where
the built-up area values are stored. The keys of the dictionary need to be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with username.
sourceid (int):
ID of the built-up area source dataset that will be sought for.
db_table (str):
Name of the table of the SQL database where the built-up area values are stored.
It is assumed that this table contains, at least, the following fields:
quadkey (str):
String indicating the quadkey of a tile.
built_area_size (float):
Value of the built-up area to be retrieved.
db_source_id (int):
ID of the source used to define the built-up area.
in_geometry (Shapely Polygon or MultiPolygon):
Geometry for which the associated data-unit tiles will be defined.
......@@ -164,7 +189,9 @@ class DataUnitTilesHelper:
# Write values to 'data_unit_tiles'
data_unit_tiles["size_data_unit_tile_area"] = all_areas
data_unit_tiles["size_data_unit_tile_built_up_area"] = (
DataUnitTilesHelper.retrieve_built_up_area(data_unit_tiles["quadkey"].values)
DataUnitTilesHelper.retrieve_built_up_area(
data_unit_tiles["quadkey"].values, db_built_up_config, db_table
)
* all_fractions
)
data_unit_tiles["fraction_data_unit_area"] = all_areas / all_areas.sum()
......@@ -463,14 +490,72 @@ class DataUnitTilesHelper:
return geometry.area
@staticmethod
def retrieve_built_up_area(quadkeys):
"""This function retrieves the built-up area associated with the input 'quadkeys' from the
OBM Tiles database.
def retrieve_built_up_area(quadkeys, db_built_up_config, db_table):
"""This function retrieves the built-up area associated with the input quadkeys from
the OBM Tiles database.
================== TO BE IMPLEMENTED ==================
Args:
quadkeys (array of str):
Quadkeys for which the built-up areas will be retrieved.
db_built_up_config (dict):
Dictionary containing the credentials needed to connect to the database where
the built-up area values are stored. The keys of the dictionary need to be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with username.
sourceid (int):
ID of the built-up area source dataset that will be sought for.
It is assumed that the database contains a table with name db_table.
db_table (str):
Name of the table of the SQL database where the built-up area values are stored.
It is assumed that this table contains, at least, the following fields:
quadkey (str):
String indicating the quadkey of a tile.
built_area_size (float):
Value of the built-up area to be retrieved.
source_id (int):
ID of the source used to define the built-up area.
Returns:
Array of floats with the built-up area values associated with the 'quadkeys'.
built_up_areas (array of float):
Built-up area values associated with the quadkeys.
"""
return numpy.ones([len(quadkeys)])
built_up_areas = numpy.zeros([len(quadkeys)], dtype=float)
# Create Database instance and establish the connection and cursor
db_built_up_areas = Database(**db_built_up_config)
db_built_up_areas.create_connection_and_cursor()
for i, quadkey in enumerate(quadkeys):
# Query the database
sql_query = (
"SELECT built_area_size FROM %s WHERE (quadkey='%s' AND source_id=%s);"
% (db_table, quadkey, db_built_up_config["sourceid"])
)
db_built_up_areas.cursor.execute(sql_query)
exec_result = db_built_up_areas.cursor.fetchall()
# Interpret the output of the query
if len(exec_result) == 0: # If the quadkey is not found the built-up area is zero
built_up_areas[i] = 0.0
elif len(exec_result) == 1:
built_up_areas[i] = exec_result[0][0]
else: # More than one entries found, this is an error
logger.error(
"ERROR IN get_ghs_built_up_area_from_obm_tiles: "
"MORE THAN ONE ENTRY FOUND FOR CELL ID %s" % (quadkey)
)
built_up_areas[i] = numpy.nan
# Close connection to database
db_built_up_areas.close_connection()
return built_up_areas
#!/usr/bin/env python3
# Copyright (C) 2021:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import logging
import psycopg2
logger = logging.getLogger()
class Database:
"""This class contains the credentials needed to connect to a given SQL database.
Attributes:
self.host (str):
SQL database host address.
self.dbname (str):
Name of the SQL database.
self.port (int):
Port where the SQL database can be found.
self.username (str):
User name to connect to the SQL database.
self.password (str):
Password associated with self.username.
self.connection (psycopg2.extensions.connection):
Object that handles the connection to the SQL database.
self.cursor (psycopg2.extensions.cursor):
Cursor object that allows to execute commands on the SQL database.
"""
def __init__(self, dbname, username, password, host, port=5432, **kwargs):
"""This function initialises the class.
Args:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
"""
self.host = host
self.dbname = dbname
self.port = port
self.username = username
self.password = password
# Initialise other attributes
self.connection = None
self.cursor = None
def create_connection_and_cursor(self):
"""Create a database connection and a cursor with the given credentials and store the
connection and cursor as new attributes.
Returns:
self.connection and self.cursor are updated.
"""
connection_string = "host={} dbname={} user={} password={} port={}".format(
self.host, self.dbname, self.username, self.password, self.port
)
connection = psycopg2.connect(connection_string)
connection.set_session(autocommit=True)
self.connection = connection
self.cursor = connection.cursor()
def close_connection(self):
"""Close the connection to the database."""
self.connection.close()
......@@ -38,6 +38,8 @@ setup(
"mercantile",
"pyproj",
"shapely",
"psycopg2-binary",
"python-dotenv",
],
extras_require={
"tests": tests_require,
......
#!/usr/bin/env python3
# Copyright (C) 2021:
# Helmholtz-Zentrum Potsdam Deutsches GeoForschungsZentrum GFZ
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import os
import pytest
from dotenv import load_dotenv
from pathlib import Path
from gdeimporter.tools.database import Database
load_dotenv(Path(".env").resolve())
@pytest.fixture
def built_up_area_test_db():
"""A test database simulating to be obm_built_area_assessments."""
init_built_up_db()
return
def init_built_up_db():
"""Populates the test database that simulates to be obm_built_area_assessments with a basic
schema and data.
"""
if "GDEIMPORTER_DB_HOST" in os.environ: # When running the CI pipeline
db_built_up_config = {
"host": os.environ.get("GDEIMPORTER_DB_HOST"),
"dbname": os.environ.get("GDEIMPORTER_DB"),
"port": "",
"username": os.environ.get("GDEIMPORTER_USER"),
"password": os.environ.get("GDEIMPORTER_PASSWORD"),
}
# Create Database instance and establish the connection and cursor
db = Database(**db_built_up_config)
db.create_connection_and_cursor()
# Create columns and populate the test database
with open("tests/data/test_database_built_up.sql", "r") as file:
for command in file.read().split(";"):
if command != "\n":
db.cursor.execute(command)
db.close_connection()
......@@ -5,3 +5,8 @@ boundaries_pathname: /some/path/to/directory
occupancies_to_run: residential, commercial, industrial
exposure_entities_to_run: all
number_cores: some
database_built_up:
host: host.somewhere.xx
dbname: some_database_name
username: some_username
password: some_password
model_name: esrm20
exposure_format: esrm20