Commit aaa543c8 authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Added functionality to erase existing entries in gde_buildings table

parent cfcc5133
Pipeline #46654 passed with stage
in 2 minutes and 18 seconds
......@@ -77,10 +77,32 @@ Data units for which no geometry is available (which should not occur unless an
occurred while running the `gde-importer`) are logged with a warning, as they cannot be
processed.
If no data units with geometries are retrieved (i.e. they don't exist in the database), a
warning is logged and the rest of the code is not run for this combination of exposure entity
and occupancy case (i.e. the code moves on to the next occupancy case).
<img src="images/gde_core_algorithm_01.png" width=75%>
Fig. 4.1 Flowchart showing the processing logic at the highest level.
### Deleting Existing Entries from GDE Buildings Database Table
The `delete_old_database_entries` method of the `DatabaseStorage` class is called to delete all
entries associated with the exposure entity and occupancy case (and aggregated_source_id) being
run, from the `gde_buildings` table of the
[GDE Tiles database](https://git.gfz-potsdam.de/dynamicexposure/globaldynamicexposure/database-gdetiles).
This step is relevant because, even though the method that writes data to the `gde_buildings`
table does delete existing entries for a specific combination of `data_unit_id`, `occupacy_case`
and `aggregated_source_id` before proceeding to write the new entries, the actual data unit IDs
associated with an exposure entity may change for different versions of the aggregated exposure
model, and it may happen that the user wants to treat different versions under the same
`aggregated_source_id`. In such a case, data associated with old data units that no longer exist
needs to be eliminated.
Note that if no new data unit IDs with geometries are identified in the previous step, the
`delete_old_database_entries` method is not run (i.e. existing entries are not erased).
### Process Data Units in Parallel
Data units are processed using the `process_data_unit` method of the
......
......@@ -363,3 +363,74 @@ class DatabaseStorage:
db_gde_tiles.close_connection()
return
@staticmethod
def delete_old_database_entries(
db_gde_tiles_config,
db_table,
exposure_entity,
occupancy_case,
aggregated_source_id,
):
"""This function deletes all entries associated with this combination of
'exposure_entity, 'occupancy_case' and 'aggregated_source_id' from table 'db_table' of
the database whose credentials are indicated in 'db_gde_tiles_config'.
The function assumes that the table 'db_table' contains a field called 'data_unit_id'
and that the IDs of the data units start with the 3-character code of the exposure
entities. It searches for 'exposure_entity' under this assumption.
Args:
db_gde_tiles_config (dict):
Dictionary containing the credentials needed to connect to the SQL database in
which table 'db_table' exists. The keys of the dictionary need to be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
db_table (str):
Name of the table of the SQL database whose contents will be erased (for this
particular combination of 'exposure_entity, 'occupancy_case' and
'aggregated_source_id'). It is assumed that this table contains, at least, the
following fields:
data_unit_id (str):
ID of the data units, whose first 3 characters are the code of the
exposure entities to which they belong.
occupancy_case (enum):
SQL enumerated type describing the building occupancy cases.
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
exposure_entity (str):
3-character code of the exposure entity whose entries will be deleted.
occupancy_case (str):
Name of the occupancy case (e.g. "residential", "commercial", "industrial")
whose entries will be deleted.
aggregated_source_id (int):
ID of the source of the aggregated exposure model whose entries will be deleted.
"""
sql_command = "DELETE FROM %s WHERE (occupancy_case='%s' AND aggregated_source_id='%s' "
sql_command += "AND strpos(data_unit_id, '%s') > 0);"
db_gde_tiles = Database(**db_gde_tiles_config)
db_gde_tiles.create_connection_and_cursor()
db_gde_tiles.cursor.execute(
sql_command
% (
db_table,
occupancy_case,
str(aggregated_source_id),
exposure_entity,
)
)
db_gde_tiles.close_connection()
return
......@@ -22,6 +22,7 @@ from multiprocessing import Pool
from functools import partial
from gdecore.configuration import Configuration
from gdecore.database_queries import DatabaseQueries
from gdecore.database_storage import DatabaseStorage
from gdecore.processor import GDEProcessor
from gdecore.occupancy_cases import OccupancyCasesESRM20
......@@ -112,6 +113,37 @@ def main():
)
)
if len(data_units_ids) == 0:
logger.warning(
"No data units found for exposure entity '%s' and occupancy case '%s'. "
"No data units will be processed for this case, and no database entries "
"will be erased for the combination (%s, %s, aggregated source ID=%s)."
% (
exposure_entity_code,
occupancy_case,
exposure_entity_code,
occupancy_case,
aggregated_source_id,
)
)
continue
# Erase existing entries of gde_buildings table for this combination of
# (exposure_entity_code, occupancy_case, aggregated_source_id)
DatabaseStorage.delete_old_database_entries(
config.database_gde_tiles,
"gde_buildings",
exposure_entity_code,
occupancy_case,
aggregated_source_id,
)
logger.info(
"Previous entries in 'gde_buildings' database table associated with "
"exposure entity '%s', occupancy case '%s' and aggregated source id %s "
"have been erased (if they existed)."
% (exposure_entity_code, occupancy_case, aggregated_source_id)
)
# Prepare data unit IDs and geometries to pass on to parallelisation
data_units_and_geometries = [
(data_units_ids[j], data_units_geometries[j])
......
......@@ -5,6 +5,7 @@ DROP TABLE IF EXISTS data_units_buildings;
DROP TABLE IF EXISTS data_unit_tiles;
DROP TABLE IF EXISTS obm_built_area_assessments;
DROP TABLE IF EXISTS gde_buildings;
DROP TABLE IF EXISTS gde_buildings_to_erase;
DROP TYPE IF EXISTS occupancycase;
DROP TYPE IF EXISTS settlement;
DROP EXTENSION IF EXISTS postgis;
......@@ -258,3 +259,43 @@ VALUES (-101010, 2, 'industrial', 'ABC_10269', '333333333333333333',
'{0.6, 0.4}',
ST_GeomFromText('POLYGON((0.0 0.0,0.002 0.0,0.002 0.003,0.0 0.003,0.0 0.0))')
);
CREATE TABLE gde_buildings_to_erase
(
osm_id integer,
aggregated_source_id SMALLINT,
occupancy_case occupancycase,
data_unit_id VARCHAR,
quadkey CHAR(18),
building_class_names VARCHAR[],
settlement_types settlement[],
occupancy_subtypes VARCHAR[],
probabilities FLOAT[],
geometry GEOMETRY,
PRIMARY KEY (osm_id, aggregated_source_id)
);
INSERT INTO gde_buildings_to_erase(osm_id,
aggregated_source_id,
occupancy_case,
data_unit_id,
quadkey,
building_class_names,
settlement_types,
occupancy_subtypes,
probabilities,
geometry)
VALUES (-101010, 2, 'industrial', 'ABC_10269', '333333333333333333',
'{"CLASS/X/params/H:1", "CLASS/Y/params/H:2"}',
'{"rural", "rural"}',
'{"all", "all"}',
'{0.723, 0.277}',
ST_GeomFromText('POLYGON((15.0491 37.4811,15.0494 37.4814,15.0495 37.4813,15.0497 37.4812,15.0495 37.4811,15.0494 37.4812,15.0492 37.4810,15.0491 37.4811))')
),
(99999999, 2, 'industrial', 'DEF_96201', '333333333333333333',
'{"CLASS/X/params/H:1", "CLASS/Y/params/H:2"}',
'{"rural", "rural"}',
'{"all", "all"}',
'{0.6, 0.4}',
ST_GeomFromText('POLYGON((0.0 0.0,0.002 0.0,0.002 0.003,0.0 0.003,0.0 0.0))')
);
......@@ -144,7 +144,7 @@ def test_store_OBM_building_classes(test_db):
returned_occupancy_subtypes,
returned_probabilities,
returned_geometry,
) = query_OBM_building_classes(config.database_gde_tiles, 99999999, 2)
) = query_OBM_building_classes(config.database_gde_tiles, "gde_buildings", 99999999, 2)
assert returned_data_unit_id == "ABC_10269"
assert returned_occupancy_case == "residential"
......@@ -189,7 +189,7 @@ def test_store_OBM_building_classes(test_db):
returned_occupancy_subtypes,
returned_probabilities,
returned_geometry,
) = query_OBM_building_classes(config.database_gde_tiles, 11223344, 2)
) = query_OBM_building_classes(config.database_gde_tiles, "gde_buildings", 11223344, 2)
assert returned_occupancy_case == "residential"
assert returned_data_unit_id == "ABC_10269"
......@@ -236,7 +236,7 @@ def test_store_OBM_building_classes(test_db):
returned_occupancy_subtypes,
returned_probabilities,
returned_geometry,
) = query_OBM_building_classes(config.database_gde_tiles, 99999999, 2)
) = query_OBM_building_classes(config.database_gde_tiles, "gde_buildings", 99999999, 2)
assert returned_data_unit_id is None
assert returned_occupancy_subtypes is None
......@@ -299,7 +299,7 @@ def test_store_OBM_building_classes(test_db):
returned_occupancy_subtypes,
returned_probabilities,
returned_geometry,
) = query_OBM_building_classes(config.database_gde_tiles, osm_id, 2)
) = query_OBM_building_classes(config.database_gde_tiles, "gde_buildings", osm_id, 2)
assert returned_occupancy_case == "commercial"
assert returned_data_unit_id == "ABC_10269"
......@@ -341,10 +341,76 @@ def test_store_OBM_building_classes(test_db):
)
def query_OBM_building_classes(credentials, osm_id, aggregated_source_id):
"""This auxiliary function queries the 'gde_buildings' table of the test database to
retrieve the building classes (and related attributes) associated with an OBM building with
ID 'osm_id' as per an aggregated model with 'aggregated_source_id'.
def test_delete_old_database_entries(test_db):
# Database connection (the Configuration class will define the credentials based on whether
# the code is running in the CI or locally)
config = Configuration(
os.path.join(os.path.dirname(__file__), "data", "config_for_testing_good.yml")
)
# First query to make sure that the table has entries to be erased
osm_ids = [-101010, 99999999]
expected_occupancy_case = ["industrial", "industrial"]
expected_data_unit_id = ["ABC_10269", "DEF_96201"]
expected_quadkey = ["333333333333333333", "333333333333333333"]
for i, osm_id in enumerate(osm_ids):
(
returned_occupancy_case,
returned_data_unit_id,
returned_quadkey,
_,
_,
_,
_,
_,
) = query_OBM_building_classes(
config.database_gde_tiles, "gde_buildings_to_erase", osm_id, 2
)
assert returned_occupancy_case == expected_occupancy_case[i]
assert returned_data_unit_id == expected_data_unit_id[i]
assert returned_quadkey == expected_quadkey[i]
# Call the method to erase the entries
DatabaseStorage.delete_old_database_entries(
config.database_gde_tiles,
"gde_buildings_to_erase",
"ABC",
"industrial",
2,
)
# Make sure the desired entry has been erased
returned_results = query_OBM_building_classes(
config.database_gde_tiles, "gde_buildings_to_erase", -101010, 2
)
for i in range(len(returned_results)):
assert returned_results[i] is None
# Make sure the other entry has NOT been erased
(
returned_occupancy_case,
returned_data_unit_id,
returned_quadkey,
_,
_,
_,
_,
_,
) = query_OBM_building_classes(
config.database_gde_tiles, "gde_buildings_to_erase", 99999999, 2
)
assert returned_occupancy_case == expected_occupancy_case[1]
assert returned_data_unit_id == expected_data_unit_id[1]
assert returned_quadkey == expected_quadkey[1]
def query_OBM_building_classes(credentials, table_name, osm_id, aggregated_source_id):
"""This auxiliary function queries the 'table_name' table of the test database to retrieve
the building classes (and related attributes) associated with an OBM building with ID
'osm_id' as per an aggregated model with 'aggregated_source_id'.
Args:
credentials (dict):
......@@ -360,6 +426,8 @@ def query_OBM_building_classes(credentials, osm_id, aggregated_source_id):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
table_name (str):
Name of the database table to query.
osm_id (int):
OSM ID of the building to query.
aggregated_source_id (int):
......@@ -390,8 +458,9 @@ def query_OBM_building_classes(credentials, osm_id, aggregated_source_id):
sql_command = (
"SELECT occupancy_case, data_unit_id, quadkey, building_class_names, settlement_types, "
"occupancy_subtypes, probabilities, geometry FROM gde_buildings "
"WHERE (osm_id=%s AND aggregated_source_id=%s);" % (osm_id, aggregated_source_id)
"occupancy_subtypes, probabilities, geometry FROM %s "
"WHERE (osm_id=%s AND aggregated_source_id=%s);"
% (table_name, osm_id, aggregated_source_id)
)
db_test = Database(**credentials)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment