Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
geomultisens
gms_preprocessing
Commits
175e57f1
Commit
175e57f1
authored
Nov 28, 2017
by
Daniel Scheffler
Browse files
Merge branch 'enhancement/provide_more_options'
Former-commit-id:
05549d60
parents
697e52b0
68ff63b7
Changes
16
Hide whitespace changes
Inline
Side-by-side
gms_preprocessing/algorithms/L1A_P.py
View file @
175e57f1
...
...
@@ -271,7 +271,7 @@ class L1A_object(GMS_object):
def
calc_TOARadRefTemp
(
self
,
subset
=
None
):
"""Convert DN, Rad or TOA_Ref data to TOA Reflectance, to Radiance or to Surface Temperature
(depending on CFG.
conversion_type_optical and conversion_type
_thermal).
(depending on CFG.
target_radunit_optical and target_radunit
_thermal).
The function can be executed by a L1A_object representing a full scene or a tile. To process a file from disk
in tiles, provide an item of self.tile_pos as the 'subset' argument."""
...
...
@@ -305,7 +305,7 @@ class L1A_object(GMS_object):
for
optical_thermal
in
[
'optical'
,
'thermal'
]:
if
optical_thermal
not
in
self
.
dict_LayerOptTherm
.
values
():
continue
conv
=
getattr
(
CFG
,
'
conversion_type
_%s'
%
optical_thermal
)
conv
=
getattr
(
CFG
,
'
target_radunit
_%s'
%
optical_thermal
)
conv
=
conv
if
conv
!=
'BOA_Ref'
else
'TOA_Ref'
assert
conv
in
[
'Rad'
,
'TOA_Ref'
,
'Temp'
],
'Unsupported conversion type: %s'
%
conv
arr_desc
=
self
.
arr_desc
.
split
(
'/'
)[
0
]
if
optical_thermal
==
'optical'
else
self
.
arr_desc
.
split
(
'/'
)[
-
1
]
...
...
@@ -390,8 +390,8 @@ class L1A_object(GMS_object):
self
.
update_spec_vals_according_to_dtype
(
'int16'
)
tiles_desc
=
'_'
.
join
([
desc
for
op_th
,
desc
in
zip
([
'optical'
,
'thermal'
],
[
CFG
.
conversion_type
_optical
,
CFG
.
conversion_type
_thermal
])
[
CFG
.
target_radunit
_optical
,
CFG
.
target_radunit
_thermal
])
if
desc
in
self
.
dict_LayerOptTherm
.
values
()])
self
.
arr
=
dataOut
...
...
gms_preprocessing/algorithms/geoprocessing.py
View file @
175e57f1
...
...
@@ -52,8 +52,8 @@ class GEOPROCESSING(object):
def
__init__
(
self
,
geodata
,
logger
,
workspace
=
None
,
subset
=
None
,
v
=
None
):
self
.
logger
=
logger
self
.
subset
=
subset
self
.
conversion_type
_optical
=
''
self
.
conversion_type
_thermal
=
''
self
.
target_radunit
_optical
=
''
self
.
target_radunit
_thermal
=
''
self
.
outpath_conv
=
None
# check gdal environment
if
v
is
not
None
:
...
...
@@ -587,7 +587,7 @@ class GEOPROCESSING(object):
extent
=
[
self
.
cols
,
self
.
rows
]
if
self
.
subset
is
not
None
and
self
.
subset
[
0
]
==
'custom'
and
\
self
.
conversion_type
_optical
==
''
and
self
.
conversion_type
_thermal
==
''
:
self
.
target_radunit
_optical
==
''
and
self
.
target_radunit
_thermal
==
''
:
# conversion to Rad or Ref overwrites self.inDs
# => custom bandsList contains bands that are NOT in range(self.bands)
bands2process
=
self
.
bandsList
...
...
gms_preprocessing/model/gms_object.py
View file @
175e57f1
...
...
@@ -855,8 +855,8 @@ class GMS_object(Dataset):
self
.
logger
.
info
(
"Writing tiles '%s' temporarily to disk..."
%
tiles
[
0
][
'desc'
])
outpath
=
os
.
path
.
join
(
self
.
ExtractedFolder
,
'%s__%s.%s'
%
(
self
.
baseN
,
tiles
[
0
][
'desc'
],
self
.
outInterleave
))
if
CFG
.
conversion_type
_optical
in
tiles
[
0
][
'desc'
]
or
\
CFG
.
conversion_type
_thermal
in
tiles
[
0
][
'desc'
]:
if
CFG
.
target_radunit
_optical
in
tiles
[
0
][
'desc'
]
or
\
CFG
.
target_radunit
_thermal
in
tiles
[
0
][
'desc'
]:
self
.
meta_odict
=
self
.
MetaObj
.
to_odict
()
# important in order to keep geotransform/projection
self
.
arr_desc
=
tiles
[
0
][
'desc'
]
self
.
arr
=
outpath
...
...
gms_preprocessing/model/metadata.py
View file @
175e57f1
...
...
@@ -1414,12 +1414,12 @@ class METADATA(object):
'BOA_Ref'
:
'BOA_Reflectance in [0-%d]'
%
CFG
.
scale_factor_BOARef
,
'Temp'
:
'Degrees Celsius with scale factor = 100'
}
if
list
(
set
(
dict_LayerOptTherm
.
values
()))
==
[
'optical'
]:
self
.
PhysUnit
=
dict_conv_physUnit
[
CFG
.
conversion_type
_optical
]
self
.
PhysUnit
=
dict_conv_physUnit
[
CFG
.
target_radunit
_optical
]
elif
list
(
set
(
dict_LayerOptTherm
.
values
()))
==
[
'thermal'
]:
self
.
PhysUnit
=
dict_conv_physUnit
[
CFG
.
conversion_type
_thermal
]
self
.
PhysUnit
=
dict_conv_physUnit
[
CFG
.
target_radunit
_thermal
]
elif
sorted
(
list
(
set
(
dict_LayerOptTherm
.
values
())))
==
[
'optical'
,
'thermal'
]:
self
.
PhysUnit
=
[
'Optical bands: %s'
%
dict_conv_physUnit
[
CFG
.
conversion_type
_optical
],
'Thermal bands: %s'
%
dict_conv_physUnit
[
CFG
.
conversion_type
_thermal
]]
self
.
PhysUnit
=
[
'Optical bands: %s'
%
dict_conv_physUnit
[
CFG
.
target_radunit
_optical
],
'Thermal bands: %s'
%
dict_conv_physUnit
[
CFG
.
target_radunit
_thermal
]]
else
:
logger
=
self
.
logger
if
hasattr
(
self
,
'logger'
)
else
temp_logger
assert
logger
,
"ERROR: Physical unit could not be determined due to unexpected 'dict_LayerOptTherm'. "
\
...
...
@@ -1478,7 +1478,7 @@ class METADATA(object):
# copy directly compatible keys
Meta
=
collections
.
OrderedDict
()
# Meta['description'] = descr_dic[self.Satellite + '_' + CFG.
conversion_type
_optical]
# Meta['description'] = descr_dic[self.Satellite + '_' + CFG.
target_radunit
_optical]
for
odictKey
in
enviHdr_keyOrder
:
if
odictKey
in
map_odictKeys_objAttrnames
:
...
...
@@ -1807,7 +1807,7 @@ def get_LayerBandsAssignment(GMS_identifier, nBands=None, ignore_usecase=False,
# remove those bands that are excluded by atmospheric corrections if proc_level >= L1C
if GMS_identifier['proc_level'] not in [None, 'L1A', 'L1B']: # TODO replace with enum procL
if CFG.
conversion_type
_optical == 'BOA_Ref':
if CFG.
target_radunit
_optical == 'BOA_Ref':
path_ac_options = get_path_ac_options(GMS_identifier)
if path_ac_options and os.path.exists(path_ac_options):
# FIXME this does not work for L7
...
...
gms_preprocessing/options/config.py
View file @
175e57f1
...
...
@@ -20,7 +20,7 @@ import pkgutil
from
pprint
import
pformat
from
typing
import
TYPE_CHECKING
from
.options_schema
import
gms_schema
from
.options_schema
import
gms_schema
_input
,
gms_schema_config_output
if
TYPE_CHECKING
:
from
gms_preprocessing.misc.database_tools
import
GMS_JOB
# noqa F401 # flake8 issue
...
...
@@ -40,14 +40,15 @@ class GMS_configuration(object):
raise
EnvironmentError
(
"Config has not been set already on this machine. Run 'set_config()' first!'"
)
GMS_config
=
GMS_configuration
()
GMS_config
=
GMS_configuration
()
# type: JobConfig
path_gmslib
=
os
.
path
.
dirname
(
pkgutil
.
get_loader
(
"gms_preprocessing"
).
path
)
path_options_default
=
os
.
path
.
join
(
path_gmslib
,
'options'
,
'options_default.json'
)
def
set_config
(
job_ID
,
exec_mode
=
'Python'
,
db_host
=
'localhost'
,
reset
=
False
,
json_config
=
''
,
exec_L1AP
=
None
,
def
set_config
(
job_ID
,
json_config
=
''
,
exec_mode
=
'Python'
,
parallelization_level
=
'scenes'
,
db_host
=
'localhost'
,
reset_status
=
False
,
delete_old_output
=
False
,
exec_L1AP
=
None
,
exec_L1BP
=
None
,
exec_L1CP
=
None
,
exec_L2AP
=
None
,
exec_L2BP
=
None
,
exec_L2CP
=
None
,
CPUs
=
None
,
allow_subMultiprocessing
=
True
,
disable_exception_handler
=
True
,
log_level
=
'INFO'
,
tiling_block_size_XY
=
(
2048
,
2048
),
is_test
=
False
,
profiling
=
False
,
benchmark_global
=
False
,
...
...
@@ -55,11 +56,15 @@ def set_config(job_ID, exec_mode='Python', db_host='localhost', reset=False, jso
"""Set up a configuration for a new gms_preprocessing job!
:param job_ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param json_config path to JSON file containing configuration parameters or a string in JSON format
:param exec_mode: 'Python': writes intermediate results to disk in order to save memory
'Flink': keeps intermediate results in memory in order to save IO time
:param parallelization_level: <str> choices: 'scenes' - parallelization on scene-level
'tiles' - parallelisation on tile-level
:param db_host: host name of the server that runs the postgreSQL database
:param reset: whether to reset the job status or not (default=False)
:param json_config path to JSON file containing configuration parameters or a string in JSON format
:param reset_status: whether to reset the job status or not (default=False)
:param delete_old_output: <bool> whether to delete previously created output of the given job ID
before running the job (default = False)
:param exec_L1AP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L1BP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L1CP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
...
...
@@ -86,7 +91,7 @@ def set_config(job_ID, exec_mode='Python', db_host='localhost', reset=False, jso
output path to store processed MGRS tiles
:param path_archive: input path where downloaded data are stored
"""
if
not
hasattr
(
builtins
,
'GMS_JobConfig'
)
or
reset
:
if
not
hasattr
(
builtins
,
'GMS_JobConfig'
)
or
reset
_status
:
kwargs
=
dict
([
x
for
x
in
locals
().
items
()
if
x
[
0
]
!=
"self"
and
not
x
[
0
].
startswith
(
'__'
)])
builtins
.
GMS_JobConfig
=
JobConfig
(
job_ID
,
**
kwargs
)
...
...
@@ -94,7 +99,7 @@ def set_config(job_ID, exec_mode='Python', db_host='localhost', reset=False, jso
class
JobConfig
(
object
):
def
__init__
(
self
,
ID
,
db_host
=
'localhost'
,
**
user_opts
):
def
__init__
(
self
,
ID
,
**
user_opts
):
"""Create a job configuration
Workflow:
...
...
@@ -104,8 +109,8 @@ class JobConfig(object):
# => zuerst JobConfig auf Basis von JSON erstellen
# 2. dann überschreiben mit user-defined parametern (entweder init-parameter oder db-settings per webapp)
:param ID:
job ID of the job to be executed, e.g. 123456 (must be present in database)
:param
db_host: host name of the server that runs the postgreSQL database
:param ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param
user_opts keyword arguments to be passed to gms_preprocessing.set_config()
"""
# privates
self
.
_DB_job_record
=
None
# type: GMS_JOB
...
...
@@ -127,10 +132,10 @@ class JobConfig(object):
# args
self
.
ID
=
ID
self
.
db_host
=
db_host
self
.
kwargs
=
user_opts
# database connection
self
.
db_host
=
user_opts
[
'db_host'
]
self
.
conn_database
=
"dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=3"
\
%
self
.
db_host
...
...
@@ -147,8 +152,12 @@ class JobConfig(object):
self
.
exec_mode
=
\
gp
(
'exec_mode'
,
json_globts
[
'exec_mode'
])
self
.
parallelization_level
=
\
gp
(
'parallelization_level'
,
json_globts
[
'parallelization_level'
])
self
.
CPUs
=
\
gp
(
'CPUs'
,
json_globts
[
'CPUs'
],
fallback
=
multiprocessing
.
cpu_count
())
self
.
delete_old_output
=
\
gp
(
'delete_old_output'
,
json_globts
[
'delete_old_output'
])
self
.
allow_subMultiprocessing
=
\
gp
(
'allow_subMultiprocessing'
,
json_globts
[
'allow_subMultiprocessing'
])
self
.
disable_exception_handler
=
\
...
...
@@ -234,10 +243,10 @@ class JobConfig(object):
gp
(
'skip_pan'
,
json_processors
[
'general_opts'
][
'skip_pan'
])
self
.
sort_bands_by_cwl
=
\
gp
(
'sort_bands_by_cwl'
,
json_processors
[
'general_opts'
][
'sort_bands_by_cwl'
])
self
.
conversion_type
_optical
=
\
gp
(
'
conversion_type
_optical'
,
json_processors
[
'general_opts'
][
'
conversion_type
_optical'
])
self
.
conversion_type
_thermal
=
\
gp
(
'
conversion_type
_thermal'
,
json_processors
[
'general_opts'
][
'
conversion_type
_thermal'
])
self
.
target_radunit
_optical
=
\
gp
(
'
target_radunit
_optical'
,
json_processors
[
'general_opts'
][
'
target_radunit
_optical'
])
self
.
target_radunit
_thermal
=
\
gp
(
'
target_radunit
_thermal'
,
json_processors
[
'general_opts'
][
'
target_radunit
_thermal'
])
self
.
scale_factor_TOARef
=
\
gp
(
'scale_factor_TOARef'
,
json_processors
[
'general_opts'
][
'scale_factor_TOARef'
])
self
.
scale_factor_BOARef
=
\
...
...
@@ -335,6 +344,7 @@ class JobConfig(object):
############
self
.
validate_exec_configs
()
GMSValidator
(
allow_unknown
=
True
,
schema
=
gms_schema_config_output
).
validate
(
self
.
to_dict
())
@
property
def
kwargs_defaults
(
self
):
...
...
@@ -368,7 +378,7 @@ class JobConfig(object):
return
getattr
(
self
.
DB_job_record
,
attr_db_job_record
)
# 3. JSON parameters: parameters that have been defined via JSON Input (command line or advanced UI params)
if
val_json
and
val_json
is
not
Non
e
:
if
val_json
or
val_json
is
Fals
e
:
return
val_json
# fallback: if nothing has been returned until here
...
...
@@ -471,7 +481,7 @@ class JobConfig(object):
default_options
.
update
(
params_dict
)
if
validate
:
GMSValidator
(
allow_unknown
=
True
,
schema
=
gms_schema
).
validate
(
default_options
)
GMSValidator
(
allow_unknown
=
True
,
schema
=
gms_schema
_input
).
validate
(
default_options
)
json_options
=
default_options
return
json_options
...
...
@@ -628,7 +638,7 @@ class JobConfig(object):
def
is_GMSConfig_available
():
try
:
if
GMS_config
.
job
is
not
None
:
if
GMS_config
is
not
None
:
return
True
except
(
EnvironmentError
,
OSError
):
return
False
...
...
@@ -717,7 +727,7 @@ def get_options(target, validation=True):
options
=
json_to_python
(
json
.
loads
(
jsmin
(
fl
.
read
())))
if
validation
is
True
:
GMSValidator
(
allow_unknown
=
True
,
schema
=
gms_schema
).
validate
(
options
)
GMSValidator
(
allow_unknown
=
True
,
schema
=
gms_schema
_input
).
validate
(
options
)
return
options
else
:
...
...
gms_preprocessing/options/options_default.json
View file @
175e57f1
{
"global_opts"
:
{
"exec_mode"
:
"Python"
,
/*
"Python"
or
"Flink"
*/
"parallelization_level"
:
"scenes"
,
/*
"scenes"
or
"tiles"
*/
"db_host"
:
"localhost"
,
"CPUs"
:
"None"
,
/*number
of
CPU
cores
to
be
used
for
processing
(default:
"None"
->
use
all
available)*/
"delete_old_output"
:
false
,
/*whether
to
delete
previously
created
output
of
the
given
job
ID*/
"allow_subMultiprocessing"
:
true
,
/*allow
multiprocessing
within
multiprocessing
workers*/
"disable_exception_handler"
:
false
,
/*enable/disable
automatic
handling
of
unexpected
exceptions*/
"log_level"
:
"INFO"
,
/*the
logging
level
to
be
used
(choices:
'DEBUG'
,
'INFO'
,
'WARNING'
,
'ERROR'
,
'CRITICAL';*/
...
...
@@ -40,8 +42,8 @@
"skip_thermal"
:
true
,
"skip_pan"
:
true
,
"sort_bands_by_cwl"
:
true
,
"
conversion_type
_optical"
:
"BOA_Ref"
,
/*'Rad'
/
'TOA_Ref'
/
'BOA_Ref'*/
"
conversion_type
_thermal"
:
"Rad"
,
/*'Rad'
/
'Temp'*/
"
target_radunit
_optical"
:
"BOA_Ref"
,
/*'Rad'
/
'TOA_Ref'
/
'BOA_Ref'*/
"
target_radunit
_thermal"
:
"Rad"
,
/*'Rad'
/
'Temp'*/
"scale_factor_TOARef"
:
10000
,
"scale_factor_BOARef"
:
10000
},
...
...
gms_preprocessing/options/options_schema.py
View file @
175e57f1
"""Definition of gms options schema (as used by cerberus library)."""
gms_schema
=
dict
(
gms_schema
_input
=
dict
(
global_opts
=
dict
(
type
=
'dict'
,
required
=
Tru
e
,
type
=
'dict'
,
required
=
Fals
e
,
schema
=
dict
(
exec_mode
=
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'Python'
,
'Flink'
]),
db_host
=
dict
(
type
=
'string'
,
required
=
True
),
CPUs
=
dict
(
type
=
'integer'
,
required
=
True
,
nullable
=
True
),
allow_subMultiprocessing
=
dict
(
type
=
'boolean'
,
required
=
True
),
disable_exception_handler
=
dict
(
type
=
'boolean'
,
required
=
True
),
log_level
=
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'DEBUG'
,
'INFO'
,
'WARNING'
,
'ERROR'
,
'CRITICAL'
]),
tiling_block_size_XY
=
dict
(
type
=
'list'
,
required
=
True
,
schema
=
dict
(
type
=
"integer"
),
minlength
=
2
,
exec_mode
=
dict
(
type
=
'string'
,
required
=
False
,
allowed
=
[
'Python'
,
'Flink'
]),
parallelization_level
=
dict
(
type
=
'string'
,
required
=
False
,
allowed
=
[
'scenes'
,
'tiles'
]),
db_host
=
dict
(
type
=
'string'
,
required
=
False
),
CPUs
=
dict
(
type
=
'integer'
,
required
=
False
,
nullable
=
True
),
delete_old_output
=
dict
(
type
=
'boolean'
,
required
=
False
),
allow_subMultiprocessing
=
dict
(
type
=
'boolean'
,
required
=
False
),
disable_exception_handler
=
dict
(
type
=
'boolean'
,
required
=
False
),
log_level
=
dict
(
type
=
'string'
,
required
=
False
,
allowed
=
[
'DEBUG'
,
'INFO'
,
'WARNING'
,
'ERROR'
,
'CRITICAL'
]),
tiling_block_size_XY
=
dict
(
type
=
'list'
,
required
=
False
,
schema
=
dict
(
type
=
"integer"
),
minlength
=
2
,
maxlength
=
2
),
is_test
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
profiling
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
benchmark_global
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
is_test
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
profiling
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
benchmark_global
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
)),
paths
=
dict
(
type
=
'dict'
,
required
=
Tru
e
,
type
=
'dict'
,
required
=
Fals
e
,
schema
=
dict
(
path_fileserver
=
dict
(
type
=
'string'
,
required
=
Tru
e
),
path_archive
=
dict
(
type
=
'string'
,
required
=
Tru
e
),
path_procdata_scenes
=
dict
(
type
=
'string'
,
required
=
Tru
e
),
path_procdata_MGRS
=
dict
(
type
=
'string'
,
required
=
Tru
e
),
path_tempdir
=
dict
(
type
=
'string'
,
required
=
Tru
e
),
path_benchmarks
=
dict
(
type
=
'string'
,
required
=
Tru
e
),
path_job_logs
=
dict
(
type
=
'string'
,
required
=
Tru
e
),
path_spatIdxSrv
=
dict
(
type
=
'string'
,
required
=
Tru
e
),
path_ac_tables
=
dict
(
type
=
'string'
,
required
=
Tru
e
),
path_SNR_models
=
dict
(
type
=
'string'
,
required
=
Tru
e
),
path_SRFs
=
dict
(
type
=
'string'
,
required
=
Tru
e
),
path_dem_proc_srtm_90m
=
dict
(
type
=
'string'
,
required
=
Tru
e
),
path_earthSunDist
=
dict
(
type
=
'string'
,
required
=
Tru
e
),
path_solar_irr
=
dict
(
type
=
'string'
,
required
=
Tru
e
),
path_cloud_classif
=
dict
(
type
=
'string'
,
required
=
Tru
e
),
path_ECMWF_db
=
dict
(
type
=
'string'
,
required
=
Tru
e
),
path_fileserver
=
dict
(
type
=
'string'
,
required
=
Fals
e
),
path_archive
=
dict
(
type
=
'string'
,
required
=
Fals
e
),
path_procdata_scenes
=
dict
(
type
=
'string'
,
required
=
Fals
e
),
path_procdata_MGRS
=
dict
(
type
=
'string'
,
required
=
Fals
e
),
path_tempdir
=
dict
(
type
=
'string'
,
required
=
Fals
e
),
path_benchmarks
=
dict
(
type
=
'string'
,
required
=
Fals
e
),
path_job_logs
=
dict
(
type
=
'string'
,
required
=
Fals
e
),
path_spatIdxSrv
=
dict
(
type
=
'string'
,
required
=
Fals
e
),
path_ac_tables
=
dict
(
type
=
'string'
,
required
=
Fals
e
),
path_SNR_models
=
dict
(
type
=
'string'
,
required
=
Fals
e
),
path_SRFs
=
dict
(
type
=
'string'
,
required
=
Fals
e
),
path_dem_proc_srtm_90m
=
dict
(
type
=
'string'
,
required
=
Fals
e
),
path_earthSunDist
=
dict
(
type
=
'string'
,
required
=
Fals
e
),
path_solar_irr
=
dict
(
type
=
'string'
,
required
=
Fals
e
),
path_cloud_classif
=
dict
(
type
=
'string'
,
required
=
Fals
e
),
path_ECMWF_db
=
dict
(
type
=
'string'
,
required
=
Fals
e
),
)),
processors
=
dict
(
type
=
'dict'
,
required
=
Tru
e
,
type
=
'dict'
,
required
=
Fals
e
,
schema
=
dict
(
general_opts
=
dict
(
type
=
'dict'
,
required
=
Tru
e
,
schema
=
dict
(
skip_thermal
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
skip_pan
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
sort_bands_by_cwl
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
conversion_type
_optical
=
dict
(
type
=
'string'
,
required
=
Tru
e
,
allowed
=
[
'Rad'
,
'TOA_Ref'
,
'BOA_Ref'
]),
conversion_type
_thermal
=
dict
(
type
=
'string'
,
required
=
Tru
e
,
allowed
=
[
'Rad'
,
'Temp'
]),
scale_factor_TOARef
=
dict
(
type
=
'integer'
,
required
=
Tru
e
),
scale_factor_BOARef
=
dict
(
type
=
'integer'
,
required
=
Tru
e
),
general_opts
=
dict
(
type
=
'dict'
,
required
=
Fals
e
,
schema
=
dict
(
skip_thermal
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
skip_pan
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
sort_bands_by_cwl
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
target_radunit
_optical
=
dict
(
type
=
'string'
,
required
=
Fals
e
,
allowed
=
[
'Rad'
,
'TOA_Ref'
,
'BOA_Ref'
]),
target_radunit
_thermal
=
dict
(
type
=
'string'
,
required
=
Fals
e
,
allowed
=
[
'Rad'
,
'Temp'
]),
scale_factor_TOARef
=
dict
(
type
=
'integer'
,
required
=
Fals
e
),
scale_factor_BOARef
=
dict
(
type
=
'integer'
,
required
=
Fals
e
),
)),
L1A
=
dict
(
type
=
'dict'
,
required
=
Tru
e
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
L1A
=
dict
(
type
=
'dict'
,
required
=
Fals
e
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
SZA_SAA_calculation_accurracy
=
dict
(
type
=
'string'
,
required
=
False
,
allowed
=
[
'coarse'
,
'fine'
]),
export_VZA_SZA_SAA_RAA_stats
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
export_VZA_SZA_SAA_RAA_stats
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
)),
L1B
=
dict
(
type
=
'dict'
,
required
=
Tru
e
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
skip_coreg
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
L1B
=
dict
(
type
=
'dict'
,
required
=
Fals
e
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
skip_coreg
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
)),
L1C
=
dict
(
type
=
'dict'
,
required
=
Tru
e
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
L1C
=
dict
(
type
=
'dict'
,
required
=
Fals
e
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
cloud_masking_algorithm
=
dict
(
type
=
'dict'
,
required
=
False
,
schema
=
{
'Landsat-4'
:
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
'Landsat-5'
:
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
'Landsat-7'
:
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
'Landsat-8'
:
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
'Sentinel-2A'
:
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
'Sentinel-2B'
:
dict
(
type
=
'string'
,
required
=
True
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
'Landsat-4'
:
dict
(
type
=
'string'
,
required
=
False
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
'Landsat-5'
:
dict
(
type
=
'string'
,
required
=
False
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
'Landsat-7'
:
dict
(
type
=
'string'
,
required
=
False
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
'Landsat-8'
:
dict
(
type
=
'string'
,
required
=
False
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
'Sentinel-2A'
:
dict
(
type
=
'string'
,
required
=
False
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
'Sentinel-2B'
:
dict
(
type
=
'string'
,
required
=
False
,
allowed
=
[
'FMASK'
,
'Classical Bayesian'
,
'SICOR'
]),
}),
export_L1C_obj_dumps
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
scale_factor_errors_ac
=
dict
(
type
=
'integer'
,
required
=
Tru
e
),
auto_download_ecmwf
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
export_L1C_obj_dumps
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
scale_factor_errors_ac
=
dict
(
type
=
'integer'
,
required
=
Fals
e
),
auto_download_ecmwf
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
)),
L2A
=
dict
(
type
=
'dict'
,
required
=
Tru
e
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
align_coord_grids
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
match_gsd
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
L2A
=
dict
(
type
=
'dict'
,
required
=
Fals
e
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
align_coord_grids
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
match_gsd
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
)),
L2B
=
dict
(
type
=
'dict'
,
required
=
Tru
e
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
L2B
=
dict
(
type
=
'dict'
,
required
=
Fals
e
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
)),
L2C
=
dict
(
type
=
'dict'
,
required
=
Tru
e
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
Tru
e
),
L2C
=
dict
(
type
=
'dict'
,
required
=
Fals
e
,
schema
=
dict
(
run_processor
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
write_output
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
delete_output
=
dict
(
type
=
'boolean'
,
required
=
Fals
e
),
)),
)),
usecase
=
dict
(
type
=
'dict'
,
required
=
Tru
e
,
schema
=
dict
(
virtual_sensor_id
=
dict
(
type
=
'integer'
,
required
=
Tru
e
),
# TODO add possible values
datasetid_spatial_ref
=
dict
(
type
=
'integer'
,
required
=
Tru
e
,
nullable
=
True
),
datasetid_spectral_ref
=
dict
(
type
=
'integer'
,
required
=
Tru
e
,
nullable
=
True
),
type
=
'dict'
,
required
=
Fals
e
,
schema
=
dict
(
virtual_sensor_id
=
dict
(
type
=
'integer'
,
required
=
Fals
e
),
# TODO add possible values
datasetid_spatial_ref
=
dict
(
type
=
'integer'
,
required
=
Fals
e
,
nullable
=
True
),
datasetid_spectral_ref
=
dict
(
type
=
'integer'
,
required
=
Fals
e
,
nullable
=
True
),
target_CWL
=
dict
(
type
=
'list'
,
required
=
False
,
schema
=
dict
(
type
=
'float'
)),
target_FWHM
=
dict
(
type
=
'list'
,
required
=
False
,
schema
=
dict
(
type
=
'float'
)),
target_gsd
=
dict
(
type
=
'list'
,
required
=
False
,
schema
=
dict
(
type
=
'float'
),
maxlength
=
2
),
target_epsg_code
=
dict
(
type
=
'integer'
,
required
=
Tru
e
,
nullable
=
True
),
target_epsg_code
=
dict
(
type
=
'integer'
,
required
=
Fals
e
,
nullable
=
True
),
spatial_ref_gridx
=
dict
(
type
=
'list'
,
required
=
False
,
schema
=
dict
(
type
=
'float'
),
maxlength
=
2
),
spatial_ref_gridy
=
dict
(
type
=
'list'
,
required
=
False
,
schema
=
dict
(
type
=
'float'
),
maxlength
=
2
),
)),
)
def
get_updated_schema
(
source_schema
,
key2update
,
new_value
):
def
deep_update
(
schema
,
key2upd
,
new_val
):
"""Return true if update, else false"""
for
key
in
schema
:
if
key
==
key2upd
:
schema
[
key
]
=
new_val
elif
isinstance
(
schema
[
key
],
dict
):
deep_update
(
schema
[
key
],
key2upd
,
new_val
)
return
schema
from
copy
import
deepcopy
tgt_schema
=
deepcopy
(
source_schema
)
return
deep_update
(
tgt_schema
,
key2update
,
new_value
)
gms_schema_config_output
=
get_updated_schema
(
gms_schema_input
,
key2update
=
'required'
,
new_value
=
True
)
gms_preprocessing/processing/pipeline.py
View file @
175e57f1
...
...
@@ -110,7 +110,7 @@ def L1C_map(L1B_objs):
L1C_objs
=
[
L1C_P
.
L1C_object
(
L1B_obj
)
for
L1B_obj
in
L1B_objs
]
# check in config if atmospheric correction is desired
if
CFG
.
conversion_type
_optical
==
'BOA_Ref'
:
if
CFG
.
target_radunit
_optical
==
'BOA_Ref'
:
# atmospheric correction (asserts that there is an ac_options.json file on disk for the current sensor)
if
L1C_objs
[
0
].
ac_options
:
# perform atmospheric correction
...
...
@@ -120,7 +120,7 @@ def L1C_map(L1B_objs):
%
(
L1C_obj
.
satellite
,
L1C_obj
.
sensor
))
for
L1C_obj
in
L1C_objs
]
else
:
[
L1C_obj
.
logger
.
warning
(
'Atmospheric correction skipped because optical conversion type is set to %s.'
%
CFG
.
conversion_type
_optical
)
for
L1C_obj
in
L1C_objs
]
%
CFG
.
target_radunit
_optical
)
for
L1C_obj
in
L1C_objs
]
# write outputs and delete temporary data
for
i
,
L1C_obj
in
enumerate
(
L1C_objs
):
...
...
gms_preprocessing/processing/process_controller.py
View file @
175e57f1
...
...
@@ -38,31 +38,24 @@ __author__ = 'Daniel Scheffler'
class
process_controller
(
object
):
def
__init__
(
self
,
job_ID
,
exec_mode
=
'Python'
,
db_host
=
'localhost'
,
parallelization_level
=
'scenes'
,
delete_old_output
=
False
,
job_config_kwargs
=
None
):
# type: (int, str, str, str, bool, dict) -> None
def
__init__
(
self
,
job_ID
,
**
config_kwargs
):
"""gms_preprocessing process controller
:param job_ID: <int> a job ID belonging to a valid database record within table 'jobs'
:param exec_mode: <str> choices: 'Python' - writes all intermediate data to disk
'Flink' - keeps all intermediate data in memory
:param db_host: <str> hostname of the host where database is hosted
:param parallelization_level: <str> choices: 'scenes' - parallelization on scene-level
'tiles' - parallelisation on tile-level
:param delete_old_output: <bool> whether to delete previously created output of the given job ID
before running the job (default = False)
:param job_ID: job ID belonging to a valid database record within table 'jobs'
:param config_kwargs: keyword arguments to be passed to gms_preprocessing.set_config()
"""
# assertions
if
not
isinstance
(
job_ID
,
int
):
raise
ValueError
(
"'job_ID' must be an integer value. Got %s."
%
type
(
job_ID
))
if
exec_mode
not
in
[
'Python'
,
'Flink'
]:
raise
ValueError
(
"Unexpected exec_mode '%s'!"
%
exec_mode
)
if
parallelization_level
not
in
[
'scenes'
,
'tiles'
]:
raise
ValueError
(
"Unexpected parallelization_level '%s'!"
%
parallelization_level
)
self
.
parallLev
=
parallelization_level
# set GMS configuration
config_kwargs
.
update
(
dict
(
reset_status
=
True
))
set_config
(
job_ID
,
**
config_kwargs
)
self
.
config
=
GMS_config
# type: GMS_config
# defaults
self
.
_logger
=
None
self
.
_DB_job_record
=
None
self
.
profiler
=
None
...
...
@@ -78,11 +71,6 @@ class process_controller(object):
self
.
summary_detailed
=
None
self
.
summary_quick
=
None
# set GMS configuration
set_config
(
job_ID
=
job_ID
,
exec_mode
=
exec_mode
,
db_host
=
db_host
,
reset
=
True
,
**
job_config_kwargs
if
job_config_kwargs
else
{})
self
.
config
=
GMS_config
# check environment
self
.
GMSEnv
=
ENV
.
GMSEnvironment
(
self
.
logger
)
self
.
GMSEnv
.
check_dependencies
()
...
...
@@ -98,7 +86,7 @@ class process_controller(object):
self
.
logger
.
info
(
'Process Controller initialized for job ID %s (comment: %s).'
%
(
self
.
config
.
ID
,
self
.
DB_job_record
.
comment
))
if
delete_old_output
:
if
self
.
config
.
delete_old_output
: