Skip to content
Snippets Groups Projects
Commit 5172f67b authored by Laurens Oostwegel's avatar Laurens Oostwegel Committed by Laurens Oostwegel
Browse files

Implement execution on multiple cores

parent dd95ab73
No related branches found
No related tags found
1 merge request!29Resolve "Implement multiprocessing using PostGIS"
Pipeline #64562 passed
......@@ -9,6 +9,7 @@ from OpenQuake exposure files.
|---------|-------------------------------------------------------------------------------------------------------|
| 1.0.0 | Code used for the 2023.01 exposure data release. First published version of the exposure-initializer. |
| 2.0.0 | Scaling of floor-space area for each building is used. |
| 2.1.0 | Multi-core processing implemented for PostGIS databases. |
## How to install
First, the package registry needs to be added to the pip index urls, so the package (and its dependencies) can be found.
......
......@@ -24,6 +24,11 @@ import configparser
import datetime
import glob
import csv
import time
import copy
from multiprocessing import Pool, Queue
from queue import Empty
from psycopg2 import Error as psycopg2_error
from databaselib.database import PostGISDatabase
from exposurelib.database import SpatiaLiteExposure, PostGISExposure
from taxonomylib import Taxonomy
......@@ -200,7 +205,74 @@ class ExposureInitializer:
asset_dict[taxonomy_id]["floorspace"] += asset["floorspace"]
return asset_dict
def process_district(self, boundary_id, country_iso_code, asset_dict):
def multiprocess_districts(self, districts, num_processors):
"""
Initializes the queue and a pool of workers that run the `process_district` function on
one core each.
Args:
districts (list):
A list of all districts, containing the information below for each district:
Boundary ID
Country ISO code
Asset dictionary
num_processors (int):
Number of processors that are used in the multiprocessing pool.
"""
logger.info(
f"Starting multiprocessing of districts with {num_processors} number of cores."
)
queue = Queue()
pool = Pool(num_processors - 1, self.worker, (queue,))
# Fill the queue
for boundary_id, country_iso_code, asset_dict in districts:
queue.put([boundary_id, country_iso_code, asset_dict])
# Wait until the queue is empty
while queue.qsize() > 0:
logger.info(f"Length queue: {queue.qsize()}")
time.sleep(10)
# Close the pool to prevent the pool from accepting new tasks
pool.close()
# Wait for all processes to finished
pool.join()
logger.info("All districts are processed.")
def worker(self, queue):
"""
One worker in a multiprocessing pool that processes districts from a queue.
Args:
queue:
Multiprocessing queue.
"""
# Copy databases and connect
exposure_db = copy.copy(self.exposure_db)
exposure_db.connect()
while True:
try:
boundary_id, country_iso_code, asset_dict = queue.get(timeout=5)
# If no objects are found in the queue, the queue raises an Empty Exception,
# therefore the process can be stopped
except Empty:
break
try:
self.process_district(boundary_id, country_iso_code, asset_dict, exposure_db)
# If there is any PostGIS fail, the district is put back into the queue
except psycopg2_error as e:
logger.warning(e)
logger.warning(f"Put district {boundary_id} back into the queue.")
queue.put([boundary_id, country_iso_code, asset_dict])
exposure_db.close()
exposure_db.connect()
exposure_db.close()
def process_district(self, boundary_id, country_iso_code, asset_dict, exposure_db=None):
"""
Processes all assets provided in `asset_list` for one district with `boundary_id`.
First, the asset list is reduced to unique taxonomy entries by summing up all key
......@@ -218,10 +290,16 @@ class ExposureInitializer:
ISO 3166-1 alpha-3 code of the country
asset_dict (dict):
Dictionary of asset values
exposure_db (AbstractExposure, optional):
If given, this exposure database is used for the SQL execution. Otherwise, the
default exposure database is used.
"""
if exposure_db is None:
exposure_db = self.exposure_db
# Check if the boundary exists in the database and retrieve it
boundary_geometry = self.exposure_db.get_boundary_geometry(boundary_id)
boundary_geometry = exposure_db.get_boundary_geometry(boundary_id)
if boundary_geometry is None:
logger.warning(f"{boundary_id}: Boundary geometry is empty.")
return
......@@ -235,17 +313,17 @@ class ExposureInitializer:
where_clause = f"""
WHERE ST_Contains(
ST_GeomFromText('{boundary_geometry}', 4326),
ST_Centroid({self.exposure_db.geometry_field})
ST_Centroid({exposure_db.geometry_field})
)
AND built_area_size IS NOT NULL"""
if isinstance(self.exposure_db, PostGISExposure):
if isinstance(exposure_db, PostGISExposure):
where_clause += f"""
AND {self.exposure_db.geometry_field}
AND {exposure_db.geometry_field}
&& ST_GeomFromText('{boundary_geometry}', 4326)
"""
basic_sql_statement = f"""
SELECT quadkey, built_area_size, {self.exposure_db.geometry_field}
FROM {self.exposure_db.tile_view}
SELECT quadkey, built_area_size, {exposure_db.geometry_field}
FROM {exposure_db.tile_view}
{where_clause}
"""
......@@ -254,8 +332,8 @@ class ExposureInitializer:
SELECT SUM(A.built_area_size) AS total_built_area
FROM ({basic_sql_statement}) AS A
"""
self.exposure_db.cursor.execute(sql_statement)
total_built_area = self.exposure_db.cursor.fetchone()[0]
exposure_db.cursor.execute(sql_statement)
total_built_area = exposure_db.cursor.fetchone()[0]
if total_built_area is None:
logger.warning(f"District {boundary_id} contains no tiles with built area.")
return
......@@ -265,15 +343,15 @@ class ExposureInitializer:
SELECT A.quadkey, A.built_area_size
FROM ({basic_sql_statement}) AS A
"""
self.exposure_db.cursor.execute(sql_statement)
tiles = self.exposure_db.cursor.fetchall()
exposure_db.cursor.execute(sql_statement)
tiles = exposure_db.cursor.fetchall()
# Add reference entities to all tiles of the district
for quadkey, built_area_size in tiles:
# Check if entity exists in EntityReference and create if necessary
entity_id = self.exposure_db.get_reference_entity_id(quadkey)
entity_id = exposure_db.get_reference_entity_id(quadkey)
if entity_id is None:
entity_id = self.exposure_db.insert_reference_entity(quadkey, country_iso_code)
entity_id = exposure_db.insert_reference_entity(quadkey, country_iso_code)
# Add the respective assets by proportion of built area to `AssetReference`. The
# structural and night values per square meter are added, if the total built
......@@ -291,13 +369,15 @@ class ExposureInitializer:
]
for taxonomy_id, asset in asset_dict.items()
]
self.exposure_db.insert_reference_assets(reference_assets)
self.exposure_db.connection.commit()
exposure_db.insert_reference_assets(reference_assets)
exposure_db.connection.commit()
logger.info(
f"{boundary_id}: {len(tiles)} entities added, built-area size: {total_built_area}"
)
def import_exposure(self, exposure_model_search_pattern, country_iso_code):
def import_exposure(
self, exposure_model_search_pattern, country_iso_code, num_processors=1
):
"""
Imports an OpenQuake-compatible aggregated exposure model into baseline entities and
baseline assets. The function reads in all exposure models matching the search
......@@ -312,9 +392,12 @@ class ExposureInitializer:
Search pattern to identify all necessary exposure filepaths.
country_iso_code (str):
ISO 3166-1 alpha-3 code of the country.
num_processors (int, optional, default: 1):
Number of processors used in multi-core processing.
"""
country_asset_dict = {}
districts = []
# Iterate through all given exposure files
for exposure_filepath in glob.glob(exposure_model_search_pattern):
logger.info(f"Processing {exposure_filepath}")
......@@ -331,8 +414,7 @@ class ExposureInitializer:
# Check if the line starts the asset list of a new location
if not (last_boundary_id == row["BOUNDARY_ID"]):
if location_count > 0:
pass
self.process_district(boundary_id, country_iso_code, asset_dict)
districts.append([boundary_id, country_iso_code, asset_dict])
location_count += 1
boundary_id = row["BOUNDARY_ID"]
asset_dict = {} # Reset the location-based asset dictionary
......@@ -370,7 +452,15 @@ class ExposureInitializer:
country_asset_dict, taxonomy_id, asset
)
self.process_district(boundary_id, country_iso_code, asset_dict)
districts.append([boundary_id, country_iso_code, asset_dict])
# If there is more than one processor, process the districts with the
# `multiprocess_districts` function
if num_processors == 1:
for boundary_id, country_iso_code, asset_dict in districts:
self.process_district(boundary_id, country_iso_code, asset_dict)
else:
self.multiprocess_districts(districts, num_processors)
logger.info("Assign the country-average assets")
# Normalize the country-average asset distribution
......@@ -610,11 +700,20 @@ def command_line_interface():
help="Filepath to the SpatiaLite exposure database",
)
# Create the parser for the 'postgis' command
subparsers.add_parser(
parser_postgis = subparsers.add_parser(
"postgis",
help="Run initializer on a (server-based) PostGIS database",
parents=[parser_shared],
)
parser_postgis.add_argument(
"-p",
"--num_processors",
required=False,
default=1,
type=int,
help="Number of processors to be used in multi-core processing (1: single-core "
"processing)",
)
# Read arguments from command line
args = parser.parse_args()
......@@ -628,12 +727,15 @@ def command_line_interface():
postgis_config.read(postgis_config_file)
# Initialize database
exposure_database_object = None
if database_type == "spatialite":
exposure_database_object = SpatiaLiteExposure(args.exposure_database)
num_processors = 1
elif database_type == "postgis":
exposure_db_config = dict(postgis_config["Exposure"])
exposure_database_object = PostGISExposure(**exposure_db_config)
num_processors = args.num_processors
else:
raise ValueError("The database can only be of type `postgis` or `spatialite`.")
# Initialize timer
start_time = datetime.datetime.now()
......@@ -650,7 +752,7 @@ def command_line_interface():
else:
initializer.exposure_db.remove_exposure(country_iso_code, delete_reference=True)
initializer.exposure_db.remove_asset_country(country_iso_code)
initializer.import_exposure(exposure_model, country_iso_code)
initializer.import_exposure(exposure_model, country_iso_code, num_processors)
initializer.process_gaps_in_exposure(country_iso_code)
initializer.exposure_db.commit_and_close()
......
......@@ -29,7 +29,7 @@ setup(
license="AGPLv3+",
install_requires=[
"numpy",
"exposurelib>=2.0.1",
"exposurelib>=2.0.2",
"taxonomylib>=1.1.0",
],
extras_require={
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment