Skip to content
Snippets Groups Projects

Add server-side cursor and query iteration

Merged Danijel Schorlemmer requested to merge feature/query_iteration into master
Compare and
1 file
+ 42
23
Compare changes
  • Side-by-side
  • Inline
+ 42
23
@@ -1462,11 +1462,11 @@ class SpatiaLiteExposure(SpatiaLiteDatabase, AbstractExposure):
if input_model == "baseline":
entity_table = "EntityReference"
asset_table = "AssetReference"
osm_id_extraction_statement = "NULL,"
osm_id_extraction_statement = "NULL"
else:
entity_table = "Entity"
asset_table = "Asset"
osm_id_extraction_statement = "Entity.osm_id,"
osm_id_extraction_statement = "Entity.osm_id"
# Create the exposure import-area restrictive SQL statement
area_restriction_clause = ""
@@ -1495,16 +1495,23 @@ class SpatiaLiteExposure(SpatiaLiteDatabase, AbstractExposure):
else:
sql_statement = f"""
SELECT {entity_table}.quadkey, taxonomy_id, number, structural, night,
{osm_id_extraction_statement}
{osm_id_extraction_statement},
ST_AsText({entity_table}.geom)
FROM {entity_table}
INNER JOIN {asset_table} ON {entity_table}.id = {asset_table}.entity_id
{area_restriction_clause}
"""
exposure_source_db.cursor.execute(sql_statement)
assets = exposure_source_db.cursor.fetchall()
assets = exposure_source_db.execute_server_side(sql_statement)
logger.debug(f"Insert {input_model} entities and assets")
for quadkey, taxonomy_id, number, structural, night, osm_id, geom_wkt in assets:
for asset_count, (
quadkey,
taxonomy_id,
number,
structural,
night,
osm_id,
geom_wkt,
) in enumerate(assets):
if output_model == "baseline":
entity_id = self.insert_reference_entity(quadkey)
self.insert_reference_asset(entity_id, taxonomy_id, number, structural, night)
@@ -1514,6 +1521,9 @@ class SpatiaLiteExposure(SpatiaLiteDatabase, AbstractExposure):
else:
entity_id = self.insert_building_entity(quadkey, osm_id, geom_wkt)[0]
self.insert_asset(entity_id, taxonomy_id, number, structural, night)
if asset_count + 1 % 10000 == 0:
logger.debug(f"{asset_count} assets imported")
self.connection.commit()
self.connection.commit()
logger.debug(f"""Entities and assets of the {input_model} model imported""")
@@ -1571,11 +1581,13 @@ class SpatiaLiteExposure(SpatiaLiteDatabase, AbstractExposure):
Args:
exposure_source_db_config (dictionary):
Dictionary containing the connection credentials for the exposure database:
`host`: host name of the server
`dbname`: name of the database
`port`: port to access the server
`username`: name of the database user
`password`: password of the database user
`host` : Host name of the server
`dbname` : Name of the database
`port` : Port to access the server
`username`: Name of the database user
`password`: Password of the database user
`timeout` : Database timeout (set to 0 for no timeout).
`itersize`: Number of rows fetched from the server at one call.
model (str, default: `standard`):
Defines which model to import. The following options are possible:
`standard` : Imports the standard model
@@ -1736,18 +1748,16 @@ class SpatiaLiteExposure(SpatiaLiteDatabase, AbstractExposure):
FROM tile_best_estimate
{area_restriction_clause}
"""
exposure_source_db.cursor.execute(sql_statement)
tile_information = exposure_source_db.cursor.fetchall()
tiles = exposure_source_db.execute_server_side(sql_statement)
logger.debug("Insert tiles")
for (
for tile_count, (
quadkey,
geom_wkt,
built_area_size,
built_up_ratio,
completeness,
country_iso_code,
) in tile_information:
) in enumerate(tiles):
self.insert_tile(
quadkey,
country_iso_code,
@@ -1756,6 +1766,9 @@ class SpatiaLiteExposure(SpatiaLiteDatabase, AbstractExposure):
completeness if completeness else "NULL",
geom_wkt,
)
if tile_count + 1 % 10000 == 0:
logger.debug(f"{tile_count} tiles imported")
self.connection.commit()
logger.debug("Tiles imported")
self.connection.commit()
@@ -1816,14 +1829,18 @@ class SpatiaLiteExposure(SpatiaLiteDatabase, AbstractExposure):
FROM obm_buildings
{area_restriction_clause}
"""
obm_source_db.cursor.execute(sql_statement)
building_information = obm_source_db.cursor.fetchall()
buildings = obm_source_db.execute_server_side(sql_statement)
logger.debug("Insert buildings")
for (osm_id, quadkey, occupancy, stories, geom_wkt) in building_information:
for building_count, (osm_id, quadkey, occupancy, stories, geom_wkt) in enumerate(
buildings
):
self.insert_building(
osm_id, quadkey, geom_wkt, occupancy=occupancy, stories=stories
)
if building_count + 1 % 10000 == 0:
logger.debug(f"{building_count} buildings imported")
self.connection.commit()
logger.debug("Buildings imported")
logger.debug("Import finished")
self.connection.commit()
@@ -2737,7 +2754,7 @@ class PostGISExposure(PostGISDatabase, AbstractExposure):
boundary_id_field = "boundary_id"
building_table = "obm_buildings"
def __init__(self, host, dbname, port, username, password, timeout=None):
def __init__(self, host, dbname, port, username, password, timeout=None, itersize=None):
"""
Initializes the PostGIS database
@@ -2754,9 +2771,11 @@ class PostGISExposure(PostGISDatabase, AbstractExposure):
Password of the user to access the database.
timeout (int, optional):
Default timeout for SQL queries in milliseconds.
itersize (int, optional):
Number of rows fetched from the server at one call.
"""
super().__init__(host, dbname, port, username, password, timeout)
super().__init__(host, dbname, port, username, password, timeout, itersize)
def insert_assets(self, assets):
"""
@@ -2776,7 +2795,7 @@ class PostGISExposure(PostGISDatabase, AbstractExposure):
structural (float):
Structural value of the asset
night (float):
Number of people in the asset at night time
Number of people in the asset at nighttime
"""
vals = ",".join(
Loading