Commit 5cffe0e6 authored by Cecilia Nievas's avatar Cecilia Nievas
Browse files

Parallelised retrieval of quadkeys to run

parent 14b6ba00
Pipeline #47654 passed with stage
in 2 minutes and 22 seconds
......@@ -20,6 +20,8 @@ import sys
import logging
import mercantile
from copy import deepcopy
from multiprocessing import Pool
from functools import partial
from gdeimporter.tools.configuration_methods import ConfigurationMethods
from gdeimporter.exposureentity import ExposureEntity
from gdeexporter.database_queries import DatabaseQueries
......@@ -426,12 +428,14 @@ class Configuration:
This function will retrieve the quadkeys associated with each of the exposure
entities especified in self.exposure_entities_to_run and 'aggregated_source_id'
from the 'db_table' of the database whose credentials are given in
'db_gde_tiles_config'.
'db_gde_tiles_config'. The retrieval of quadkeys is parallelised and uses
self.number_cores.
selection_mode = "data_unit_id":
This function will retrieve the quadkeys associated with each of the data units
especified in self.geographic_selection["data_unit_ids"] and
'aggregated_source_id' from the 'db_table' of the database whose credentials are
given in 'db_gde_tiles_config'.
given in 'db_gde_tiles_config'. The retrieval of quadkeys is parallelised and
uses self.number_cores.
selection_mode = "quadkeys":
This function will retrieve the quadkeys specified in the TXT file whose file
path is indicated in self.geographic_selection["quadkeys_file"].
......@@ -458,8 +462,8 @@ class Configuration:
password (str):
Password associated with self.username.
db_table (str):
Name of the table of the SQL database where the data units are stored. It is
assumed that this table contains, at least, the following fields:
Name of the table of the SQL database where the data unit tiles are stored. It
is assumed that this table contains, at least, the following fields:
quadkey (str):
String indicating the quadkey of a tile.
aggregated_source_id (int):
......@@ -476,19 +480,24 @@ class Configuration:
if self.geographic_selection["selection_mode"].lower() == "exposure_entity":
quadkeys_to_process = {}
number_quadkeys = 0
for exposure_entity_code in self.exposure_entities_to_run:
quadkeys_list = (
DatabaseQueries.retrieve_quadkeys_by_exposure_entity_aggregated_source_id(
exposure_entity_code,
aggregated_source_id,
self.database_gde_tiles,
"data_unit_tiles",
)
)
if len(quadkeys_list) > 0:
quadkeys_to_process[exposure_entity_code] = quadkeys_list
number_quadkeys += len(quadkeys_list)
p = Pool(processes=self.number_cores)
func = partial(
DatabaseQueries.retrieve_quadkeys_by_exposure_entity_aggregated_source_id,
aggregated_source_id,
self.database_gde_tiles,
db_table,
)
lists_of_quadkeys_per_exposure_entity = p.map(func, self.exposure_entities_to_run)
p.close()
p.join()
for i, exposure_entity_code in enumerate(self.exposure_entities_to_run):
if len(lists_of_quadkeys_per_exposure_entity[i]) > 0:
quadkeys_to_process[
exposure_entity_code
] = lists_of_quadkeys_per_exposure_entity[i]
number_quadkeys += len(lists_of_quadkeys_per_exposure_entity[i])
else:
logger.info(
"No quadkeys found for exposure entity '%s', skipping"
......@@ -498,19 +507,22 @@ class Configuration:
if self.geographic_selection["selection_mode"].lower() == "data_unit_id":
quadkeys_to_process = {}
number_quadkeys = 0
for data_unit_id in self.geographic_selection["data_unit_ids"]:
quadkeys_list = (
DatabaseQueries.retrieve_quadkeys_by_data_unit_id_aggregated_source_id(
data_unit_id,
aggregated_source_id,
self.database_gde_tiles,
"data_unit_tiles",
)
)
if len(quadkeys_list) > 0:
quadkeys_to_process[data_unit_id] = quadkeys_list
number_quadkeys += len(quadkeys_list)
p = Pool(processes=self.number_cores)
func = partial(
DatabaseQueries.retrieve_quadkeys_by_data_unit_id_aggregated_source_id,
aggregated_source_id,
self.database_gde_tiles,
db_table,
)
lists_of_quadkeys_per_unit = p.map(func, self.geographic_selection["data_unit_ids"])
p.close()
p.join()
for i, data_unit_id in enumerate(self.geographic_selection["data_unit_ids"]):
if len(lists_of_quadkeys_per_unit[i]) > 0:
quadkeys_to_process[data_unit_id] = lists_of_quadkeys_per_unit[i]
number_quadkeys += len(lists_of_quadkeys_per_unit[i])
else:
logger.info(
"No quadkeys found for data unit '%s', skipping" % (data_unit_id)
......
......@@ -153,7 +153,7 @@ class DatabaseQueries:
@staticmethod
def retrieve_quadkeys_by_exposure_entity_aggregated_source_id(
exposure_entity, aggregated_source_id, db_gde_tiles_config, db_table
aggregated_source_id, db_gde_tiles_config, db_table, exposure_entity
):
"""
This function retrives all quadkeys associated with 'exposure_entity' and
......@@ -161,9 +161,6 @@ class DatabaseQueries:
'db_gde_tiles_config'.
Args:
exposure_entity (str):
3-character code of the exposure entity for which the data unit IDs and
geometries will be retrieved.
aggregated_source_id (int):
ID of the source of the aggregated exposure model for which the data unit IDs
and geometries will be retrieved.
......@@ -190,6 +187,9 @@ class DatabaseQueries:
ID of the source of the aggregated exposure model.
exposure_entity (str):
3-character code of the exposure entity.
exposure_entity (str):
3-character code of the exposure entity for which the data unit IDs and
geometries will be retrieved.
Returns:
quadkeys (list of str):
......@@ -219,7 +219,7 @@ class DatabaseQueries:
@staticmethod
def retrieve_quadkeys_by_data_unit_id_aggregated_source_id(
data_unit_id, aggregated_source_id, db_gde_tiles_config, db_table
aggregated_source_id, db_gde_tiles_config, db_table, data_unit_id
):
"""
This function retrives all quadkeys associated with 'data_unit_id' and
......@@ -227,8 +227,6 @@ class DatabaseQueries:
'db_gde_tiles_config'.
Args:
data_unit_id (str):
ID of the data unit for which the quadkeys will be retrieved.
aggregated_source_id (int):
ID of the source of the aggregated exposure model for which the data unit IDs
and geometries will be retrieved.
......@@ -255,6 +253,8 @@ class DatabaseQueries:
ID of the source of the aggregated exposure model.
data_unit_id (str):
ID of the data unit.
data_unit_id (str):
ID of the data unit for which the quadkeys will be retrieved.
Returns:
quadkeys (list of str):
......
......@@ -89,7 +89,7 @@ def test_retrieve_quadkeys_by_exposure_entity_aggregated_source_id(test_db):
returned_quadkeys = (
DatabaseQueries.retrieve_quadkeys_by_exposure_entity_aggregated_source_id(
"ABC", 2, config.database_gde_tiles, "data_unit_tiles"
2, config.database_gde_tiles, "data_unit_tiles", "ABC"
)
)
......@@ -107,7 +107,7 @@ def test_retrieve_quadkeys_by_exposure_entity_aggregated_source_id(test_db):
returned_quadkeys = (
DatabaseQueries.retrieve_quadkeys_by_exposure_entity_aggregated_source_id(
"XYZ", 2, config.database_gde_tiles, "data_unit_tiles"
2, config.database_gde_tiles, "data_unit_tiles", "XYZ"
)
)
......@@ -122,7 +122,7 @@ def test_retrieve_quadkeys_by_data_unit_id_aggregated_source_id(test_db):
)
returned_quadkeys = DatabaseQueries.retrieve_quadkeys_by_data_unit_id_aggregated_source_id(
"ABC_10269", 2, config.database_gde_tiles, "data_unit_tiles"
2, config.database_gde_tiles, "data_unit_tiles", "ABC_10269"
)
expected_quadkeys = [
......@@ -136,7 +136,7 @@ def test_retrieve_quadkeys_by_data_unit_id_aggregated_source_id(test_db):
assert len(returned_quadkeys) == len(expected_quadkeys)
returned_quadkeys = DatabaseQueries.retrieve_quadkeys_by_data_unit_id_aggregated_source_id(
"ABC_10269", 77, config.database_gde_tiles, "data_unit_tiles"
77, config.database_gde_tiles, "data_unit_tiles", "ABC_10269"
)
assert len(returned_quadkeys) == 0
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment