Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
geomultisens
gms_preprocessing
Commits
48de5278
Commit
48de5278
authored
Nov 27, 2017
by
Daniel Scheffler
Browse files
Added options schema and activated options validation. Improved Test_JobConfig(). Bugfixes.
Former-commit-id:
f2915453
parent
6b122d4b
Changes
8
Hide whitespace changes
Inline
Side-by-side
gms_preprocessing/config.py
View file @
48de5278
...
...
@@ -19,6 +19,8 @@ from cerberus import Validator
import
pkgutil
from
typing
import
TYPE_CHECKING
from
.options_schema
import
gms_schema
if
TYPE_CHECKING
:
from
.misc.database_tools
import
GMS_JOB
# noqa F401 # flake8 issue
...
...
@@ -40,8 +42,8 @@ class GMS_configuration(object):
GMS_config
=
GMS_configuration
()
def
set_config
(
job_ID
,
exec_mode
=
'Python'
,
db_host
=
'localhost'
,
reset
=
False
,
exec_L1AP
=
None
,
exec_L1
B
P
=
None
,
exec_L1CP
=
None
,
exec_L2AP
=
None
,
exec_L2BP
=
None
,
exec_L2CP
=
None
,
CPUs
=
None
,
def
set_config
(
job_ID
,
exec_mode
=
'Python'
,
db_host
=
'localhost'
,
reset
=
False
,
json_config
=
''
,
exec_L1
A
P
=
None
,
exec_L1BP
=
None
,
exec_L1CP
=
None
,
exec_L2AP
=
None
,
exec_L2BP
=
None
,
exec_L2CP
=
None
,
CPUs
=
None
,
allow_subMultiprocessing
=
True
,
disable_exception_handler
=
True
,
log_level
=
'INFO'
,
tiling_block_size_XY
=
(
2048
,
2048
),
is_test
=
False
,
profiling
=
False
,
benchmark_global
=
False
,
path_procdata_scenes
=
None
,
path_procdata_MGRS
=
None
,
path_archive
=
None
):
...
...
@@ -52,6 +54,7 @@ def set_config(job_ID, exec_mode='Python', db_host='localhost', reset=False, exe
'Flink': keeps intermediate results in memory in order to save IO time
:param db_host: host name of the server that runs the postgreSQL database
:param reset: whether to reset the job status or not (default=False)
:param json_config path to JSON file containing configuration parameters or a string in JSON format
:param exec_L1AP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L1BP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L1CP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
...
...
@@ -217,7 +220,7 @@ class JobConfig(object):
# privates
self
.
_DB_job_record
=
None
# type: GMS_JOB
self
.
_DB_config_table
=
None
# type: dict
self
.
_
user_opt
s_defaults
=
None
self
.
_
kwarg
s_defaults
=
None
# fixed attributes
# possible values: 'pending', 'running', 'canceled', 'failed', 'finished_with_warnings',
...
...
@@ -235,14 +238,14 @@ class JobConfig(object):
# args
self
.
ID
=
ID
self
.
db_host
=
db_host
self
.
user_opt
s
=
user_opts
self
.
kwarg
s
=
user_opts
# database connection
self
.
conn_database
=
"dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=3"
\
%
self
.
db_host
# get validated options dict from JSON-options
json_opts
=
self
.
get_json_opts
_from_db
(
validate
=
True
)
json_opts
=
self
.
get_json_opts
(
validate
=
True
)
gp
=
self
.
get_parameter
...
...
@@ -431,17 +434,17 @@ class JobConfig(object):
self
.
data_list
=
self
.
get_data_list_of_current_jobID
()
@
property
def
user_opt
s_defaults
(
self
):
if
not
self
.
_
user_opt
s_defaults
:
def
kwarg
s_defaults
(
self
):
if
not
self
.
_
kwarg
s_defaults
:
a
=
getfullargspec
(
set_config
)
self
.
_
user_opt
s_defaults
=
dict
(
zip
(
a
.
args
[
-
len
(
a
.
defaults
):],
a
.
defaults
))
self
.
_
kwarg
s_defaults
=
dict
(
zip
(
a
.
args
[
-
len
(
a
.
defaults
):],
a
.
defaults
))
return
self
.
_
user_opt
s_defaults
return
self
.
_
kwarg
s_defaults
def
get_parameter
(
self
,
key_user_opts
,
val_json
=
None
,
attr_db_job_record
=
''
,
fallback
=
None
):
# 1. JobConfig parameters: parameters that are directly passed to JobConfig
if
key_user_opts
in
self
.
user_opt
s
and
self
.
user_opt
s
[
key_user_opts
]
!=
self
.
user_opt
s_defaults
[
key_user_opts
]:
return
self
.
user_opt
s
[
key_user_opts
]
if
key_user_opts
in
self
.
kwarg
s
and
self
.
kwarg
s
[
key_user_opts
]
!=
self
.
kwarg
s_defaults
[
key_user_opts
]:
return
self
.
kwarg
s
[
key_user_opts
]
# 2. WebUI parameters: parameters that have been defined via WebUI
if
attr_db_job_record
:
...
...
@@ -452,8 +455,8 @@ class JobConfig(object):
return
val_json
# fallback: if nothing has been returned until here
if
not
fallback
and
key_user_opts
in
self
.
user_opt
s_defaults
:
fallback
=
self
.
user_opt
s_defaults
[
key_user_opts
]
if
not
fallback
and
key_user_opts
in
self
.
kwarg
s_defaults
:
fallback
=
self
.
kwarg
s_defaults
[
key_user_opts
]
return
fallback
@
property
...
...
@@ -505,7 +508,7 @@ class JobConfig(object):
return
VSSpecs
def
get_json_opts
_from_db
(
self
,
validate
=
True
):
def
get_json_opts
(
self
,
validate
=
True
):
"""Get a dictionary of GMS config parameters according to the jobs table of the database.
NOTE: Reads the default options from options_default.json and updates the values with those from database.
...
...
@@ -514,20 +517,45 @@ class JobConfig(object):
default_options
=
get_options
(
os
.
path
.
join
(
os
.
path
.
dirname
(
pkgutil
.
get_loader
(
"gms_preprocessing"
).
path
),
'options_default.json'
),
validation
=
validate
)
if
'json_config'
in
self
.
kwargs
and
self
.
kwargs
[
'json_config'
]:
if
self
.
kwargs
[
'json_config'
].
startswith
(
"{"
):
try
:
params_dict
=
json
.
loads
(
jsmin
(
self
.
kwargs
[
'json_config'
]))
except
JSONDecodeError
:
warnings
.
warn
(
'The given JSON options string could not be decoded. '
'JSON decoder failed with the following error:'
)
raise
elif
os
.
path
.
isfile
(
self
.
kwargs
[
'json_config'
]):
try
:
with
open
(
self
.
kwargs
[
'json_config'
],
'r'
)
as
inF
:
params_dict
=
json
.
loads
(
jsmin
(
inF
.
read
()))
except
JSONDecodeError
:
warnings
.
warn
(
'The given JSON options file %s could not be decoded. '
'JSON decoder failed with the following error:'
%
self
.
kwargs
[
'json_config'
])
raise
else
:
raise
ValueError
(
"The parameter 'json_config' must be a JSON formatted string or a JSON file on disk."
)
# convert values to useful data types and update the default values
params_dict
=
json_to_python
(
params_dict
)
default_options
.
update
(
params_dict
)
# update default options with those from DB
if
self
.
DB_job_record
.
analysis_parameter
:
try
:
params_dict
=
json
.
loads
(
jsmin
(
self
.
DB_job_record
.
analysis_parameter
))
except
JSONDecodeError
:
warnings
.
warn
(
'The
given JSON options file could not be decoded. JSON decoder failed with the
'
'following error:'
)
warnings
.
warn
(
'The
advanced options given in the WebUI could not be decoded.
'
'
JSON decoder failed with the
following error:'
)
raise
db_options
=
json_to_python
(
params_dict
)
# type: dict
default_options
.
update
(
db_options
)
# convert values to useful data types and update the default values
params_dict
=
json_to_python
(
params_dict
)
default_options
.
update
(
params_dict
)
if
validate
:
GMSValidator
().
validate
(
default_options
)
GMSValidator
(
allow_unknown
=
True
,
schema
=
gms_schema
).
validate
(
default_options
)
json_options
=
default_options
return
json_options
...
...
@@ -984,9 +1012,6 @@ def is_GMSConfig_available():
return
False
gms_schema
=
None
def
json_to_python
(
value
):
def
is_number
(
s
):
try
:
...
...
@@ -1022,17 +1047,14 @@ def json_to_python(value):
class
GMSValidator
(
Validator
):
def
__init__
(
self
,
*
args
,
**
kwargs
):
# type: (list, dict) -> None
"""
:param args: Arguments to be passed to cerberus.Validator
:param kwargs: Keyword arguments to be passed to cerberus.Validator
"""
super
(
GMSValidator
,
self
).
__init__
(
*
args
,
allow_unknown
=
True
,
schema
=
gms_schema
,
**
kwargs
)
super
(
GMSValidator
,
self
).
__init__
(
*
args
,
**
kwargs
)
def
validate
(
self
,
document2validate
,
**
kwargs
):
warnings
.
warn
(
"Currently options cannot be validated yet."
)
return
None
# FIXME
if
super
(
GMSValidator
,
self
).
validate
(
document
=
document2validate
,
**
kwargs
)
is
False
:
raise
ValueError
(
"Options is malformed: %s"
%
str
(
self
.
errors
))
...
...
@@ -1051,7 +1073,7 @@ def get_options(target, validation=True):
options
=
json_to_python
(
json
.
loads
(
jsmin
(
fl
.
read
())))
if
validation
is
True
:
GMSValidator
().
validate
(
options
)
GMSValidator
(
allow_unknown
=
True
,
schema
=
gms_schema
).
validate
(
options
)
return
options
else
:
...
...
gms_preprocessing/options_schema.py
View file @
48de5278
"""Definition of gms options schema (as used by cerberus library)."""
gms_schema
=
{
}
gms_schema
=
dict
(
exec_mode
=
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'Python'
,
'Flink'
]),
db_host
=
dict
(
type
=
'string'
,
required
=
True
),
CPUs
=
dict
(
type
=
'integer'
,
required
=
True
,
nullable
=
True
),
allow_subMultiprocessing
=
dict
(
type
=
'boolean'
,
required
=
True
),
disable_exception_handler
=
dict
(
type
=
'boolean'
,
required
=
True
),
log_level
=
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'DEBUG'
,
'INFO'
,
'WARNING'
,
'ERROR'
,
'CRITICAL'
]),
tiling_block_size_XY
=
dict
(
type
=
'list'
,
required
=
True
,
schema
=
dict
(
type
=
"integer"
),
minlength
=
2
,
maxlength
=
2
),
is_test
=
dict
(
type
=
'boolean'
,
required
=
True
),
profiling
=
dict
(
type
=
'boolean'
,
required
=
True
),
benchmark_global
=
dict
(
type
=
'boolean'
,
required
=
True
),
paths
=
dict
(
type
=
'dict'
,
required
=
True
,
schema
=
dict
(
path_fileserver
=
dict
(
type
=
'string'
,
required
=
True
),
path_archive
=
dict
(
type
=
'string'
,
required
=
True
),
path_procdata_scenes
=
dict
(
type
=
'string'
,
required
=
True
),
path_procdata_MGRS
=
dict
(
type
=
'string'
,
required
=
True
),
path_tempdir
=
dict
(
type
=
'string'
,
required
=
True
),
path_benchmarks
=
dict
(
type
=
'string'
,
required
=
True
),
path_job_logs
=
dict
(
type
=
'string'
,
required
=
True
),
path_spatIdxSrv
=
dict
(
type
=
'string'
,
required
=
True
),
path_ac_tables
=
dict
(
type
=
'string'
,
required
=
True
),
path_SNR_models
=
dict
(
type
=
'string'
,
required
=
True
),
path_SRFs
=
dict
(
type
=
'string'
,
required
=
True
),
path_dem_proc_srtm_90m
=
dict
(
type
=
'string'
,
required
=
True
),
path_earthSunDist
=
dict
(
type
=
'string'
,
required
=
True
),
path_solar_irr
=
dict
(
type
=
'string'
,
required
=
True
),
path_cloud_classif
=
dict
(
type
=
'string'
,
required
=
True
),
path_ECMWF_db
=
dict
(
type
=
'string'
,
required
=
True
),
)),
processors
=
dict
(
type
=
'dict'
,
required
=
True
,
schema
=
dict
(
general_opts
=
dict
(
type
=
'dict'
,
required
=
True
,
schema
=
dict
(
skip_thermal
=
dict
(
type
=
'boolean'
,
required
=
True
),
skip_pan
=
dict
(
type
=
'boolean'
,
required
=
True
),
sort_bands_by_cwl
=
dict
(
type
=
'boolean'
,
required
=
True
),
conversion_type_optical
=
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'Rad'
,
'TOA_Ref'
,
'BOA_Ref'
]),
conversion_type_thermal
=
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'Rad'
,
'Temp'
]),
scale_factor_TOARef
=
dict
(
type
=
'integer'
,
required
=
True
),
scale_factor_BOARef
=
dict
(
type
=
'integer'
,
required
=
True
),
)),
L1A_P
=
dict
(
type
=
'dict'
,
required
=
True
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
True
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
True
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
True
),
SZA_SAA_calculation_accurracy
=
dict
(
type
=
'string'
,
required
=
False
,
allowed
=
[
'coarse'
,
'fine'
]),
export_VZA_SZA_SAA_RAA_stats
=
dict
(
type
=
'boolean'
,
required
=
True
),
)),
L1B_P
=
dict
(
type
=
'dict'
,
required
=
True
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
True
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
True
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
True
),
skip_coreg
=
dict
(
type
=
'boolean'
,
required
=
True
),
)),
L1C_P
=
dict
(
type
=
'dict'
,
required
=
True
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
True
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
True
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
True
),
cloud_masking_algorithm
=
dict
(
type
=
'dict'
,
required
=
False
,
schema
=
{
'Landsat-4'
:
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
'Landsat-5'
:
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
'Landsat-7'
:
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
'Landsat-8'
:
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
'Sentinel-2A'
:
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
'Sentinel-2B'
:
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
}),
export_L1C_obj_dumps
=
dict
(
type
=
'boolean'
,
required
=
True
),
scale_factor_errors_ac
=
dict
(
type
=
'integer'
,
required
=
True
),
auto_download_ecmwf
=
dict
(
type
=
'boolean'
,
required
=
True
),
)),
L2A_P
=
dict
(
type
=
'dict'
,
required
=
True
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
True
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
True
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
True
),
align_coord_grids
=
dict
(
type
=
'boolean'
,
required
=
True
),
match_gsd
=
dict
(
type
=
'boolean'
,
required
=
True
),
)),
L2B_P
=
dict
(
type
=
'dict'
,
required
=
True
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
True
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
True
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
True
),
)),
L2C_P
=
dict
(
type
=
'dict'
,
required
=
True
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
True
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
True
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
True
),
)),
)),
usecase
=
dict
(
type
=
'dict'
,
required
=
True
,
schema
=
dict
(
virtual_sensor_id
=
dict
(
type
=
'integer'
,
required
=
True
),
# TODO add possible values
datasetid_spatial_ref
=
dict
(
type
=
'integer'
,
required
=
True
,
nullable
=
True
),
datasetid_spectral_ref
=
dict
(
type
=
'integer'
,
required
=
True
),
target_CWL
=
dict
(
type
=
'list'
,
required
=
False
,
schema
=
dict
(
type
=
'float'
)),
target_FWHM
=
dict
(
type
=
'list'
,
required
=
False
,
schema
=
dict
(
type
=
'float'
)),
target_gsd
=
dict
(
type
=
'list'
,
required
=
False
,
schema
=
dict
(
type
=
'float'
),
maxlength
=
2
),
target_epsg_code
=
dict
(
type
=
'integer'
,
required
=
True
,
nullable
=
True
),
spatial_ref_gridx
=
dict
(
type
=
'list'
,
required
=
False
,
schema
=
dict
(
type
=
'float'
),
maxlength
=
2
),
spatial_ref_gridy
=
dict
(
type
=
'list'
,
required
=
False
,
schema
=
dict
(
type
=
'float'
),
maxlength
=
2
),
)),
)
requirements.txt
View file @
48de5278
...
...
@@ -26,3 +26,5 @@ psycopg2
# fmask # not pip installable
six
tqdm
jsmin
cerberus
requirements_pip.txt
View file @
48de5278
...
...
@@ -12,3 +12,5 @@ geoalchemy2
sqlalchemy
six
tqdm
jsmin
cerberus
setup.py
View file @
48de5278
...
...
@@ -14,7 +14,7 @@ with open('HISTORY.rst') as history_file:
requirements
=
[
'matplotlib'
,
'numpy'
,
'scikit-learn'
,
'scipy'
,
'gdal'
,
'pyproj'
,
'shapely'
,
'ephem'
,
'pyorbital'
,
'dill'
,
'pytz'
,
'pandas'
,
'numba'
,
'spectral>=0.16'
,
'geopandas'
,
'iso8601'
,
'pyinstrument'
,
'geoalchemy2'
,
'sqlalchemy'
,
'psycopg2'
,
'py_tools_ds>=0.12.4'
,
'geoarray>=0.7.1'
,
'arosics>=0.6.6'
,
'six'
,
'tqdm'
'psycopg2'
,
'py_tools_ds>=0.12.4'
,
'geoarray>=0.7.1'
,
'arosics>=0.6.6'
,
'six'
,
'tqdm'
,
'jsmin'
,
'cerberus'
# spectral<0.16 has some problems with writing signed integer 8bit data
# fmask # conda install -c conda-forge python-fmask
# 'pyhdf', # conda install --yes -c conda-forge pyhdf
...
...
tests/CI_docker/context/environment_gms_preprocessing.yml
View file @
48de5278
...
...
@@ -69,6 +69,8 @@ dependencies:
-
sqlalchemy
-
psycopg2
-
pandas
-
jsmin
-
cerberus
-
py_tools_ds>=0.12.4
-
geoarray>=0.7.0
-
arosics>=0.6.6
tests/test_config.py
View file @
48de5278
...
...
@@ -10,6 +10,7 @@ Tests for gms_preprocessing.config
import
os
from
unittest
import
TestCase
from
json
import
JSONDecodeError
from
gms_preprocessing
import
__path__
from
gms_preprocessing.config
import
get_options
...
...
@@ -31,6 +32,37 @@ class Test_JobConfig(TestCase):
self
.
jobID
=
26186662
self
.
db_host
=
'geoms'
def
test
(
self
):
def
test
_plain_args
(
self
):
cfg
=
JobConfig
(
self
.
jobID
,
self
.
db_host
)
print
(
cfg
)
self
.
assertIsInstance
(
cfg
,
JobConfig
)
def
test_jsonconfig_str_allfine
(
self
):
cfg
=
'{"a": 1 /*comment*/, "b":2}'
cfg
=
JobConfig
(
self
.
jobID
,
self
.
db_host
,
json_config
=
cfg
)
self
.
assertIsInstance
(
cfg
,
JobConfig
)
def
test_jsonconfig_str_nojson
(
self
):
cfg
=
'dict(a=1 /*comment*/, b=2)'
with
self
.
assertRaises
(
ValueError
):
JobConfig
(
self
.
jobID
,
self
.
db_host
,
json_config
=
cfg
)
def
test_jsonconfig_str_badcomment
(
self
):
cfg
=
'{"a": 1 /comment*/, "b":2}'
with
self
.
assertWarns
(
UserWarning
),
self
.
assertRaises
(
JSONDecodeError
):
JobConfig
(
self
.
jobID
,
self
.
db_host
,
json_config
=
cfg
)
def
test_jsonconfig_str_undecodable_val
(
self
):
cfg
=
'{"a": None /comment*/, "b":2}'
with
self
.
assertWarns
(
UserWarning
),
self
.
assertRaises
(
JSONDecodeError
):
JobConfig
(
self
.
jobID
,
self
.
db_host
,
json_config
=
cfg
)
def
test_jsonconfig_str_schema_violation
(
self
):
cfg
=
'{"exec_mode": "badvalue"}'
with
self
.
assertRaises
(
ValueError
):
JobConfig
(
self
.
jobID
,
self
.
db_host
,
json_config
=
cfg
)
def
test_jsonconfig_file
(
self
):
cfg
=
os
.
path
.
join
(
__path__
[
0
],
'options_default.json'
)
cfg
=
JobConfig
(
self
.
jobID
,
self
.
db_host
,
json_config
=
cfg
)
self
.
assertIsInstance
(
cfg
,
JobConfig
)
tests/test_exception_handler.py
View file @
48de5278
...
...
@@ -24,6 +24,7 @@ class BaseTest_ExceptionHandler:
def
get_process_controller
(
self
,
jobID
):
self
.
PC
=
process_controller
(
jobID
,
parallelization_level
=
'scenes'
,
db_host
=
'geoms'
,
job_config_kwargs
=
dict
(
is_test
=
True
,
log_level
=
'DEBUG'
))
self
.
PC
.
config
.
disable_exception_handler
=
False
# update attributes of DB_job_record and related DB entry
self
.
PC
.
DB_job_record
.
reset_job_progress
()
...
...
@@ -57,7 +58,7 @@ class BaseTest_ExceptionHandler:
class
Test_ExceptionHandler_NoSubsystems
(
BaseTest_ExceptionHandler
.
Test_ExceptionHandler
):
def
setUp
(
self
):
super
().
get_process_controller
(
26186261
)
# Landsat-8 Coll. Data
super
(
Test_ExceptionHandler_NoSubsystems
,
self
).
get_process_controller
(
26186261
)
# Landsat-8 Coll. Data
def
test_L1A_mapper_success_progress_stats
(
self
):
"""Check correctness of progress stats if scene succeeds."""
...
...
@@ -73,7 +74,8 @@ class Test_ExceptionHandler_NoSubsystems(BaseTest_ExceptionHandler.Test_Exceptio
def
test_disabled_exception_handler
(
self
):
"""Check if exception handler raises exceptions of CFG.Job.disable_exception_handler = True."""
self
.
PC
.
config
.
disable_exception_handler
=
True
self
.
assertRaises
(
RuntimeError
,
self
.
dummy_gms_mapper_fail
,
self
.
PC
.
config
.
data_list
[
0
])
with
(
self
.
assertRaises
(
RuntimeError
)):
self
.
dummy_gms_mapper_fail
(
self
.
PC
.
config
.
data_list
[
0
])
class
Test_ExceptionHandler_Subsystems
(
BaseTest_ExceptionHandler
.
Test_ExceptionHandler
):
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment