Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Dynamic Exposure
Global Dynamic Exposure
gde-importer
Commits
38eec665
Commit
38eec665
authored
Nov 16, 2021
by
Cecilia Nievas
Browse files
Added feature to write data units attributes to database table
parent
2ba96d5b
Pipeline
#30836
passed with stage
in 2 minutes and 17 seconds
Changes
12
Pipelines
2
Show whitespace changes
Inline
Side-by-side
gdeimporter/aggregatedexposuremodel.py
View file @
38eec665
...
...
@@ -170,6 +170,89 @@ class AggregatedExposureModel(abc.ABC):
return
aggregated_source_id
,
writing_mode
def
store_data_units
(
self
,
db_data_units_config
,
db_table
,
exposure_entity_name
,
occupancy_case
,
aggregated_source_id
,
):
"""This function stores all data units associated with
self.exposure_entities[exposure_entity_name].occupancy_cases[occupancy_case] to the
table db_table in the database whose credentials are indicated in db_data_units_config.
Args:
db_data_units_config (dict):
Dictionary containing the credentials needed to connect to the SQL database in
which information on the data units is stored. The keys of the dictionary need
to be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
db_table (str):
Name of the table of the SQL database where the data units will be stored. It is
assumed that this table contains, at least, the following fields:
data_unit_id (str):
ID of the data unit.
exposure_entity (str):
3-character code of the exposure entity.
occupancy_case (enum):
SQL enumerated type describing the building occupancy cases.
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
buildings_total (float):
Total number of buildings in the DataUnit.
dwellings_total (float):
Total number of dwellings in the DataUnit.
people_census (float):
Total number of census people in the DataUnit.
cost_total (float):
Total replacement cost of buildings in the DataUnit.
exposure_entity_name (str):
Name of the ExposureEntity whose data units will be stored. It needs to be a
key of self.exposure_entities and self.exposure_entities[exposure_entity_name]
needs to have at least the following attributes:
code (str):
3-character code that uniquely identifies this ExposureEntity.
occupancy_cases (dict):
Dictionary as defined in the attributes of ExposureEntity.
'occupancy_case' needs to be one of its keys, with existing subkey
'data_units'.
occupancy_case (str):
Name of the occupancy case (e.g. "residential", "commercial", "industrial")
associated with this Data Unit. It needs to be a key of
self.exposure_entities[exposure_entity_name].occupancy_cases.
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
Returns:
This function calls DataUnit.write_data_unit_to_database() to store attributes of
all data units present in self.exposure_entities[exposure_entity_name]
.occupancy_cases[occupancy_case]["data_units"].
"""
data_units
=
self
.
exposure_entities
[
exposure_entity_name
].
occupancy_cases
[
occupancy_case
][
"data_units"
]
for
data_unit_id
in
data_units
.
keys
():
data_units
[
data_unit_id
].
write_data_unit_to_database
(
db_data_units_config
,
db_table
,
aggregated_source_id
,
occupancy_case
,
self
.
exposure_entities
[
exposure_entity_name
].
code
,
)
return
class
ExposureModelESRM20
(
AggregatedExposureModel
):
"""This class represents the European Seismic Risk Model 2020 (ESRM20) aggregated exposure
...
...
@@ -435,7 +518,7 @@ class ExposureModelESRM20(AggregatedExposureModel):
exposure_entity_name (str):
Name of the ExposureEntity whose data units will be retrieved. It needs to be a
key of self.exposure_entities and self.exposure_entities[exposure_entity_name]
needs to have at least the following attribute:
needs to have at least the following attribute
s
:
name (str):
Name of the ExposureEntity.
occupancy_cases (dict):
...
...
gdeimporter/dataunit.py
View file @
38eec665
...
...
@@ -18,6 +18,7 @@
import
logging
import
numpy
from
gdeimporter.tools.database
import
Database
logger
=
logging
.
getLogger
()
...
...
@@ -248,3 +249,139 @@ class DataUnit:
)
return
geometry
def
write_data_unit_to_database
(
self
,
db_data_units_config
,
db_table
,
aggregated_source_id
,
occupancy_case
,
exposure_entity_code
,
):
"""This function writes the attributes of the DataUnit to the table with name db_table
in the database whose credentials are indicated in db_data_units_config. If an entry
already exists in the database for this DataUnit, the attributes are updated. If an
entry does not exist beforehand, it is created.
Args:
db_data_units_config (dict):
Dictionary containing the credentials needed to connect to the SQL database in
which information on the data units is stored. The keys of the dictionary need
to be:
host (str):
SQL database host address.
dbname (str):
Name of the SQL database.
port (int):
Port where the SQL database can be found.
username (str):
User name to connect to the SQL database.
password (str):
Password associated with self.username.
db_table (str):
Name of the table of the SQL database where the data units will be stored. It is
assumed that this table contains, at least, the following fields:
data_unit_id (str):
ID of the data unit.
occupancy_case (enum):
SQL enumerated type describing the building occupancy cases.
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
exposure_entity (str):
3-character code of the exposure entity.
buildings_total (float):
Total number of buildings in the DataUnit.
dwellings_total (float):
Total number of dwellings in the DataUnit.
people_census (float):
Total number of census people in the DataUnit.
cost_total (float):
Total replacement cost of buildings in the DataUnit.
aggregated_source_id (int):
ID of the source of the aggregated exposure model.
occupancy_case (str):
Name of the occupancy case (e.g. "residential", "commercial", "industrial")
associated with this data_unit.
exposure_entity_code (str):
3-character string containing the ISO3 code of the country, if the exposure
entity is a country, or a code that the user defines in the configuration file.
Returns:
This function writes to the table with name db_table in the database whose
credentials are indicated in db_data_units_config.
"""
data_unit_full_id
=
"%s_%s"
%
(
exposure_entity_code
,
self
.
id
)
sql_commands
=
{}
sql_commands
[
"query"
]
=
"SELECT COUNT(*) FROM %s"
sql_commands
[
"query"
]
+=
" WHERE (data_unit_id='%s' AND occupancy_case='%s'"
sql_commands
[
"query"
]
+=
" AND aggregated_source_id='%s');"
sql_commands
[
"update"
]
=
"UPDATE %s"
sql_commands
[
"update"
]
+=
" SET (exposure_entity, buildings_total, dwellings_total,"
sql_commands
[
"update"
]
+=
" people_census, cost_total)"
sql_commands
[
"update"
]
+=
" = ('%s','%s','%s','%s','%s')"
sql_commands
[
"update"
]
+=
" WHERE (data_unit_id='%s' AND occupancy_case='%s'"
sql_commands
[
"update"
]
+=
" AND aggregated_source_id='%s');"
sql_commands
[
"insert"
]
=
"INSERT INTO"
sql_commands
[
"insert"
]
+=
" %s(data_unit_id, occupancy_case, aggregated_source_id,"
sql_commands
[
"insert"
]
+=
" exposure_entity, buildings_total, dwellings_total,"
sql_commands
[
"insert"
]
+=
" people_census, cost_total)"
sql_commands
[
"insert"
]
+=
" VALUES('%s','%s','%s','%s','%s','%s','%s','%s');"
db_gde_tiles
=
Database
(
**
db_data_units_config
)
db_gde_tiles
.
create_connection_and_cursor
()
# Check if an entry already exists for this data unit
db_gde_tiles
.
cursor
.
execute
(
sql_commands
[
"query"
]
%
(
db_table
,
data_unit_full_id
,
occupancy_case
,
str
(
aggregated_source_id
),
)
)
exec_result
=
db_gde_tiles
.
cursor
.
fetchall
()
if
exec_result
[
0
][
0
]
>
0
:
# Entry exists --> update
db_gde_tiles
.
cursor
.
execute
(
sql_commands
[
"update"
]
%
(
db_table
,
exposure_entity_code
,
str
(
self
.
total_buildings
),
str
(
self
.
total_dwellings
),
str
(
self
.
total_people
[
"Census"
]),
str
(
self
.
total_cost
[
"Total"
]),
data_unit_full_id
,
occupancy_case
,
str
(
aggregated_source_id
),
)
)
elif
exec_result
[
0
][
0
]
==
0
:
# No entry for this data unit exists --> append
db_gde_tiles
.
cursor
.
execute
(
sql_commands
[
"insert"
]
%
(
db_table
,
data_unit_full_id
,
occupancy_case
,
str
(
aggregated_source_id
),
exposure_entity_code
,
str
(
self
.
total_buildings
),
str
(
self
.
total_dwellings
),
str
(
self
.
total_people
[
"Census"
]),
str
(
self
.
total_cost
[
"Total"
]),
)
)
else
:
# More than one entries found, this is an error
logger
.
error
(
"ERROR IN write_data_unit_to_database: MORE THAN ONE ENTRY FOUND FOR"
" data_unit_id='%s' AND occupancy_case='%s' AND aggregated_source_id='%s'"
%
(
data_unit_full_id
,
occupancy_case
,
aggregated_source_id
)
)
db_gde_tiles
.
close_connection
()
return
gdeimporter/gdeimporter.py
View file @
38eec665
...
...
@@ -65,6 +65,13 @@ def main():
for
exposure_entity_name
in
config
.
exposure_entities_to_run
:
for
occupancy_case
in
config
.
occupancies_to_run
:
aem
.
get_data_units
(
config
,
exposure_entity_name
,
occupancy_case
)
aem
.
store_data_units
(
config
.
database_gde_tiles
,
"data_units"
,
exposure_entity_name
,
occupancy_case
,
aem_source_id
,
)
aem
.
exposure_entities
[
exposure_entity_name
].
create_data_unit_tiles
(
occupancy_case
,
config
.
number_cores
,
...
...
tests/conftest.py
View file @
38eec665
...
...
@@ -30,6 +30,7 @@ def test_db():
"""A test database simulating to contain the following tables:
- obm_built_area_assessments (of the OBM Tiles database)
- aggregated_sources (of the GDE Tiles database)
- data_units (of the GDE Tiles database)
- data_unit_tiles (of the GDE Tiles database).
"""
...
...
@@ -39,7 +40,7 @@ def test_db():
def
init_test_db
():
"""Populates the test database that simulates to contain obm_built_area_assessments,
aggregated_sources and data_unit_tiles with a basic schema and data.
aggregated_sources
, data_units
and data_unit_tiles with a basic schema and data.
"""
if
"GDEIMPORTER_DB_HOST"
in
os
.
environ
:
# When running the CI pipeline
...
...
@@ -55,20 +56,8 @@ def init_test_db():
db
=
Database
(
**
db_built_up_config
)
db
.
create_connection_and_cursor
()
# Create columns and populate the obm_built_area_assessments table
with
open
(
"tests/data/test_database_built_up.sql"
,
"r"
)
as
file
:
for
command
in
file
.
read
().
split
(
";"
):
if
command
!=
"
\n
"
:
db
.
cursor
.
execute
(
command
)
# Create columns and populate the aggregated_sources table
with
open
(
"tests/data/test_database_aggregated_sources.sql"
,
"r"
)
as
file
:
for
command
in
file
.
read
().
split
(
";"
):
if
command
!=
"
\n
"
:
db
.
cursor
.
execute
(
command
)
# Create columns and populate the aggregated_sources table
with
open
(
"tests/data/test_database_data_unit_tiles.sql"
,
"r"
)
as
file
:
# Create columns and populate the tables
with
open
(
"tests/data/test_database_set_up.sql"
,
"r"
)
as
file
:
for
command
in
file
.
read
().
split
(
";"
):
if
command
!=
"
\n
"
:
db
.
cursor
.
execute
(
command
)
...
...
tests/data/config_for_testing_exposure_entities_code_
dict
.yml
→
tests/data/config_for_testing_exposure_entities_code_
iso3
.yml
View file @
38eec665
...
...
@@ -4,9 +4,7 @@ data_pathname: /some/path/to/directory
boundaries_pathname
:
/some/path/to/directory
occupancies_to_run
:
residential, commercial, industrial
exposure_entities_to_run
:
all
exposure_entities_code
:
Name 1
:
NM1
Name 2
:
XXX
exposure_entities_code
:
ISO3
number_cores
:
4
database_built_up
:
host
:
host.somewhere.xx
...
...
tests/data/config_for_testing_good.yml
View file @
38eec665
...
...
@@ -4,7 +4,15 @@ data_pathname: /some/path/to/directory
boundaries_pathname
:
/some/path/to/directory
occupancies_to_run
:
residential, commercial, industrial
exposure_entities_to_run
:
all
exposure_entities_code
:
ISO3
exposure_entities_code
:
Entity_1
:
EN1
Entity_2
:
EN2
Entity_3
:
EN3
Entity_4
:
EN4
Entity_5
:
EN5
Entity_6
:
EN6
Entity_7
:
EN7
Entity_8
:
EN8
number_cores
:
4
database_built_up
:
host
:
host.somewhere.xx
...
...
tests/data/test_database_aggregated_sources.sql
deleted
100644 → 0
View file @
2ba96d5b
DROP
TABLE
IF
EXISTS
aggregated_sources
;
CREATE
TABLE
aggregated_sources
(
aggregated_source_id
SERIAL
PRIMARY
KEY
,
name
varchar
,
format
varchar
);
INSERT
INTO
aggregated_sources
(
name
,
format
)
VALUES
(
'first_source'
,
'aaa'
),
(
'second_source'
,
'bbb'
),
(
'third_source'
,
'ccc'
),
(
'first_source'
,
'ddd'
);
tests/data/test_database_built_up.sql
deleted
100644 → 0
View file @
2ba96d5b
DROP
TABLE
IF
EXISTS
obm_built_area_assessments
;
CREATE
TABLE
obm_built_area_assessments
(
quadkey
char
(
18
),
source_id
SMALLINT
,
built_area_size
FLOAT
,
PRIMARY
KEY
(
quadkey
,
source_id
)
);
INSERT
INTO
obm_built_area_assessments
(
quadkey
,
source_id
,
built_area_size
)
VALUES
(
'122100203232131100'
,
1
,
7512
.
38
),
(
'122100203232131101'
,
1
,
3449
.
81
),
(
'122100203232131102'
,
1
,
1266
.
45
),
(
'122100203232131103'
,
1
,
5016
.
22
),
(
'120222331000133202'
,
1
,
1823
.
553
),
(
'120222331000133203'
,
1
,
3391
.
246
),
(
'120222331000133212'
,
1
,
7403
.
582
),
(
'120222331000133213'
,
1
,
5937
.
64
),
(
'120222331000133220'
,
1
,
2072
.
341
),
(
'120222331000133221'
,
1
,
1227
.
524
),
(
'120222331000133230'
,
1
,
2921
.
676
),
(
'120222331000133231'
,
1
,
4244
.
951
);
tests/data/test_database_
data_unit_tiles
.sql
→
tests/data/test_database_
set_up
.sql
View file @
38eec665
DROP
TABLE
IF
EXISTS
obm_built_area_assessments
;
DROP
TABLE
IF
EXISTS
aggregated_sources
;
DROP
TABLE
IF
EXISTS
data_units
;
DROP
TABLE
IF
EXISTS
data_unit_tiles
;
DROP
TYPE
IF
EXISTS
occupancycase
;
CREATE
TYPE
occupancycase
AS
ENUM
(
'residential'
,
'commercial'
,
'industrial'
);
CREATE
TABLE
obm_built_area_assessments
(
quadkey
char
(
18
),
source_id
SMALLINT
,
built_area_size
FLOAT
,
PRIMARY
KEY
(
quadkey
,
source_id
)
);
INSERT
INTO
obm_built_area_assessments
(
quadkey
,
source_id
,
built_area_size
)
VALUES
(
'122100203232131100'
,
1
,
7512
.
38
),
(
'122100203232131101'
,
1
,
3449
.
81
),
(
'122100203232131102'
,
1
,
1266
.
45
),
(
'122100203232131103'
,
1
,
5016
.
22
),
(
'120222331000133202'
,
1
,
1823
.
553
),
(
'120222331000133203'
,
1
,
3391
.
246
),
(
'120222331000133212'
,
1
,
7403
.
582
),
(
'120222331000133213'
,
1
,
5937
.
64
),
(
'120222331000133220'
,
1
,
2072
.
341
),
(
'120222331000133221'
,
1
,
1227
.
524
),
(
'120222331000133230'
,
1
,
2921
.
676
),
(
'120222331000133231'
,
1
,
4244
.
951
);
CREATE
TABLE
aggregated_sources
(
aggregated_source_id
SERIAL
PRIMARY
KEY
,
name
varchar
,
format
varchar
);
INSERT
INTO
aggregated_sources
(
name
,
format
)
VALUES
(
'first_source'
,
'aaa'
),
(
'second_source'
,
'bbb'
),
(
'third_source'
,
'ccc'
),
(
'first_source'
,
'ddd'
);
CREATE
TABLE
data_unit_tiles
(
quadkey
char
(
18
),
...
...
@@ -25,3 +63,26 @@ INSERT INTO data_unit_tiles(quadkey,
fraction_data_unit_area
,
fraction_data_unit_built_up_area
)
VALUES
(
'120222331000133202'
,
2
,
'residential'
,
'GRC'
,
'GRC_39821'
,
2532
.
671
,
287
.
720
,
0
.
0572566
,
0
.
086
);
CREATE
TABLE
data_units
(
data_unit_id
VARCHAR
,
occupancy_case
occupancycase
,
aggregated_source_id
SMALLINT
,
exposure_entity
CHAR
(
3
),
buildings_total
FLOAT
,
dwellings_total
FLOAT
,
people_census
FLOAT
,
cost_total
FLOAT
,
PRIMARY
KEY
(
data_unit_id
,
occupancy_case
,
aggregated_source_id
)
);
INSERT
INTO
data_units
(
data_unit_id
,
occupancy_case
,
aggregated_source_id
,
exposure_entity
,
buildings_total
,
dwellings_total
,
people_census
,
cost_total
)
VALUES
(
'ABC_123456'
,
'residential'
,
17
,
'ABC'
,
0
.
0
,
0
.
0
,
0
.
0
,
0
.
0
);
tests/test_aggregatedexposuremodel.py
View file @
38eec665
...
...
@@ -23,9 +23,10 @@ import numpy
from
gdeimporter.configuration
import
Configuration
from
gdeimporter.aggregatedexposuremodel
import
AggregatedExposureModel
,
ExposureModelESRM20
from
gdeimporter.tools.database
import
Database
from
tests.test_dataunit
import
query_data_units
def
test_ExposureModelESRM20
():
def
test_ExposureModelESRM20
(
test_db
):
# Test case in which the metadata file is not found
config
=
Configuration
(
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
"data"
,
"config_for_testing_good.yml"
)
...
...
@@ -406,6 +407,44 @@ def test_ExposureModelESRM20():
4
,
)
# Write data units to the test database
returned_aem
.
store_data_units
(
config
.
database_gde_tiles
,
"data_units"
,
entity_names
[
i
],
occupancy_names
[
i
],
18
,
)
# Test that the data units have been stored correctly
for
row
in
range
(
2
,
11
):
query_result
=
query_data_units
(
config
.
database_gde_tiles
,
"EN%s_%s"
%
(
expected_results_totals
[
"Col_0"
].
values
[
row
][
-
1
],
expected_results_totals
[
"Col_2"
].
values
[
row
],
),
expected_results_totals
[
"Col_1"
].
values
[
row
],
18
,
)
assert
len
(
query_result
)
==
1
# one entry found in the database
# exposure_entity
assert
query_result
[
0
][
0
]
==
"EN%s"
%
(
expected_results_totals
[
"Col_0"
].
values
[
row
][
-
1
])
assert
round
(
query_result
[
0
][
1
],
2
)
==
round
(
float
(
expected_results_totals
[
"Col_3"
].
values
[
row
]),
2
)
# total buildings
assert
round
(
query_result
[
0
][
2
],
2
)
==
round
(
float
(
expected_results_totals
[
"Col_4"
].
values
[
row
]),
2
)
# total dwellings
assert
round
(
query_result
[
0
][
3
],
2
)
==
round
(
float
(
expected_results_totals
[
"Col_5"
].
values
[
row
]),
2
)
# total people
assert
round
(
query_result
[
0
][
4
],
0
)
==
round
(
float
(
expected_results_totals
[
"Col_10"
].
values
[
row
]),
0
)
# total cost
assert
(
"data_units"
not
in
returned_aem
.
exposure_entities
[
"Entity_1"
].
occupancy_cases
[
"industrial"
]
...
...
tests/test_configuration.py
View file @
38eec665
...
...
@@ -34,7 +34,12 @@ def test_Configuration():
assert
returned_config
.
boundaries_pathname
==
"/some/path/to/directory"
assert
returned_config
.
occupancies_to_run
==
[
"residential"
,
"commercial"
,
"industrial"
]
assert
returned_config
.
exposure_entities_to_run
==
[
"all"
]
assert
returned_config
.
exposure_entities_code
==
"ISO3"
assert
isinstance
(
returned_config
.
exposure_entities_code
,
dict
)
assert
len
(
returned_config
.
exposure_entities_code
.
keys
())
==
8
for
i
in
range
(
1
,
9
):
name
=
"Entity_%s"
%
(
str
(
i
))
code
=
"EN%s"
%
(
str
(
i
))
assert
returned_config
.
exposure_entities_code
[
name
]
==
code
assert
returned_config
.
number_cores
==
4
assert
len
(
returned_config
.
database_built_up
.
keys
())
==
4
assert
returned_config
.
database_built_up
[
"host"
]
==
"host.somewhere.xx"
...
...
@@ -184,22 +189,17 @@ def test_Configuration():
assert
returned_database_built_up
is
None
# Test case in which exposure_entities_code is
a dictionary
# Test case in which exposure_entities_code is
ISO3
returned_config
=
Configuration
(
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
"data"
,
"config_for_testing_exposure_entities_code_
dict
.yml"
,
"config_for_testing_exposure_entities_code_
iso3
.yml"
,
),
force_config_over_hierarchies
=
True
,
)
expected_exposure_entities_code
=
{
"Name 1"
:
"NM1"
,
"Name 2"
:
"XXX"
,
}
assert
returned_config
.
exposure_entities_code
==
expected_exposure_entities_code
assert
returned_config
.
exposure_entities_code
==
"ISO3"
# Test case in which exposure_entities_code is a string different from ISO3
with
pytest
.
raises
(
SystemExit
)
as
excinfo
:
...
...
tests/test_dataunit.py
View file @
38eec665
...
...
@@ -16,17 +16,21 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see http://www.gnu.org/licenses/.
import
os
import
logging
import
geopandas
import
shapely
import
pytest
from
copy
import
deepcopy
from
pyproj
import
CRS
from
gdeimporter.dataunit
import
DataUnit
from
gdeimporter.configuration
import
Configuration
from
gdeimporter.tools.database
import
Database
logger
=
logging
.
getLogger
()
def
test_DataUnit
():
def
test_DataUnit
(
test_db
):
# Test case in which the data unit IDs are strings (usual case)
target_column_name
=
"ID_X"
unit_ids
=
[
"Unit_1"
,
"123456"
]
...
...
@@ -77,10 +81,14 @@ def test_DataUnit():
for
key
in
total_people
:
assert
abs
(
returned_data_unit
.
total_people
[
key
]
-
total_people
[
key
])
<
1e-5
del
total_people
[
"Night"
]
total_people_incomplete
=
deepcopy
(
total_people
)
del
total_people_incomplete
[
"Night"
]
with
pytest
.
raises
(
AssertionError
)
as
excinfo
:
returned_data_unit
=
DataUnit
(
unit_ids
[
0
],
geometries_table
,
target_column_name
,
total_people
=
total_people
unit_ids
[
0
],
geometries_table
,
target_column_name
,
total_people
=
total_people_incomplete
,
)
assert
"AssertionError"
in
str
(
excinfo
.
type
)
...
...
@@ -98,9 +106,109 @@ def test_DataUnit():
for
key
in
total_cost
:
assert
abs
(
returned_data_unit
.
total_cost
[
key
]
-
total_cost
[
key
])
<
1e-5
del
total_cost
[
"Structural"
]
total_cost_incomplete
=
deepcopy
(
total_cost
)
del
total_cost_incomplete
[
"Structural"
]
with
pytest
.
raises
(
AssertionError
)
as
excinfo
:
returned_data_unit
=
DataUnit
(
unit_ids
[
0
],
geometries_table
,
target_column_name
,
total_cost
=
total_cost
unit_ids
[
0
],
geometries_table
,
target_column_name
,
total_cost
=
total_cost
_incomplete
)
assert
"AssertionError"
in
str
(
excinfo
.
type
)
# Test writing to data_units table in the database
# Database configuration
config
=
Configuration
(
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
"data"
,
"config_for_testing_good.yml"
)
)
# Testing two data units: first adds entry, second one updates existing entry
for
i
in
range
(
len
(
unit_ids
)):
returned_data_unit
=
DataUnit
(
unit_ids
[
i
],
geometries_table
,
target_column_name
,
total_buildings
=
total_buildings
[
i
],
total_dwellings
=
total_dwellings
[
i
],
total_people
=
total_people
,
total_cost
=
total_cost
,
currency
=
currency
,