Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
geomultisens
gms_preprocessing
Commits
e5522a45
Commit
e5522a45
authored
Nov 24, 2017
by
Daniel Scheffler
Browse files
Config revision - intermediate state: Tests are running again.
Former-commit-id:
64594529
Former-commit-id:
7df60bbb
parent
98bce4a0
Changes
24
Hide whitespace changes
Inline
Side-by-side
bin/run_gms.py
View file @
e5522a45
...
...
@@ -20,8 +20,8 @@ def run_from_jobid(args):
# set up process controller instance
PC
=
process_controller
(
args
.
jobid
,
parallelization_level
=
'scenes'
,
db_host
=
'geoms'
)
# FIXME hardcoded host
# PC.
job.
path_procdata_scenes = '/geoms/data/processed_scenes_dev'
# PC.
job.
path_procdata_MGRS = '/geoms/data/processed_mgrs_tiles_dev'
# PC.path_procdata_scenes = '/geoms/data/processed_scenes_dev'
# PC.path_procdata_MGRS = '/geoms/data/processed_mgrs_tiles_dev'
# run the job
PC
.
run_all_processors
()
...
...
@@ -112,8 +112,8 @@ def _run_job(dbJob, parallelization_level='scenes'):
warnings
.
warn
(
"Currently the console argument parser sets the parallelization level to 'scenes'."
)
# TODO
PC
=
process_controller
(
jobid
,
parallelization_level
=
parallelization_level
)
# PC.
job.
path_procdata_scenes = '/geoms/data/processed_scenes_dev'
# PC.
job.
path_procdata_MGRS = '/geoms/data/processed_mgrs_tiles_dev'
# PC.path_procdata_scenes = '/geoms/data/processed_scenes_dev'
# PC.path_procdata_MGRS = '/geoms/data/processed_mgrs_tiles_dev'
# run the job
PC
.
run_all_processors
()
...
...
gms_preprocessing/algorithms/L1A_P.py
View file @
e5522a45
...
...
@@ -143,7 +143,7 @@ class L1A_object(GMS_object):
subset
=
[
'block'
,
[[
sub_dim
[
0
],
sub_dim
[
1
]
+
1
],
[
sub_dim
[
2
],
sub_dim
[
3
]
+
1
]]]
rasObj
=
GEOP
.
GEOPROCESSING
(
paths_files2stack
[
0
],
self
.
logger
,
subset
=
subset
)
if
CFG
.
job
.
exec_mode
==
'Flink'
and
path_output
is
None
:
# numpy array output
if
CFG
.
exec_mode
==
'Flink'
and
path_output
is
None
:
# numpy array output
self
.
arr
=
rasObj
.
Layerstacking
(
paths_files2stack
)
self
.
path_InFilePreprocessor
=
paths_files2stack
[
0
]
else
:
# 'MEMORY' or physical output
...
...
@@ -162,7 +162,7 @@ class L1A_object(GMS_object):
subset
=
[
'block'
,
[[
sub_dim
[
0
],
sub_dim
[
1
]
+
1
],
[
sub_dim
[
2
],
sub_dim
[
3
]
+
1
]]]
rasObj
=
GEOP
.
GEOPROCESSING
(
path_file2load
,
self
.
logger
,
subset
=
subset
)
if
CFG
.
job
.
exec_mode
==
'Flink'
and
path_output
is
None
:
# numpy array output
if
CFG
.
exec_mode
==
'Flink'
and
path_output
is
None
:
# numpy array output
self
.
arr
=
gdalnumeric
.
LoadFile
(
path_file2load
)
if
subset
is
None
else
\
gdalnumeric
.
LoadFile
(
path_file2load
,
rasObj
.
colStart
,
rasObj
.
rowStart
,
rasObj
.
cols
,
rasObj
.
rows
)
self
.
path_InFilePreprocessor
=
path_file2load
...
...
@@ -190,7 +190,7 @@ class L1A_object(GMS_object):
data_arr
=
np
.
empty
(
data
.
shape
+
(
len
(
self
.
LayerBandsAssignment
),),
data
.
dtype
)
data_arr
[:,
:,
bidx
]
=
data
if
CFG
.
job
.
exec_mode
==
'Flink'
and
path_output
is
None
:
# numpy array output
if
CFG
.
exec_mode
==
'Flink'
and
path_output
is
None
:
# numpy array output
self
.
arr
=
data_arr
else
:
GEOP
.
ndarray2gdal
(
data_arr
,
path_output
,
geotransform
=
ds
.
GetGeoTransform
(),
...
...
@@ -221,7 +221,7 @@ class L1A_object(GMS_object):
data_arr
=
np
.
empty
(
data
.
shape
+
(
len
(
self
.
LayerBandsAssignment
),),
data
.
dtype
)
data_arr
[:,
:,
i
]
=
data
if
CFG
.
job
.
exec_mode
==
'Flink'
and
path_output
is
None
:
# numpy array output
if
CFG
.
exec_mode
==
'Flink'
and
path_output
is
None
:
# numpy array output
self
.
arr
=
data_arr
else
:
GEOP
.
ndarray2gdal
(
data_arr
,
path_output
,
direction
=
3
)
...
...
@@ -271,7 +271,7 @@ class L1A_object(GMS_object):
def
calc_TOARadRefTemp
(
self
,
subset
=
None
):
"""Convert DN, Rad or TOA_Ref data to TOA Reflectance, to Radiance or to Surface Temperature
(depending on CFG.
usecase.
conversion_type_optical and conversion_type_thermal).
(depending on CFG.conversion_type_optical and conversion_type_thermal).
The function can be executed by a L1A_object representing a full scene or a tile. To process a file from disk
in tiles, provide an item of self.tile_pos as the 'subset' argument."""
...
...
@@ -305,7 +305,7 @@ class L1A_object(GMS_object):
for
optical_thermal
in
[
'optical'
,
'thermal'
]:
if
optical_thermal
not
in
self
.
dict_LayerOptTherm
.
values
():
continue
conv
=
getattr
(
CFG
.
usecase
,
'conversion_type_%s'
%
optical_thermal
)
conv
=
getattr
(
CFG
,
'conversion_type_%s'
%
optical_thermal
)
conv
=
conv
if
conv
!=
'BOA_Ref'
else
'TOA_Ref'
assert
conv
in
[
'Rad'
,
'TOA_Ref'
,
'Temp'
],
'Unsupported conversion type: %s'
%
conv
arr_desc
=
self
.
arr_desc
.
split
(
'/'
)[
0
]
if
optical_thermal
==
'optical'
else
self
.
arr_desc
.
split
(
'/'
)[
-
1
]
...
...
@@ -330,7 +330,7 @@ class L1A_object(GMS_object):
inSaturated
)
if
conv
==
'TOA_Ref'
else
\
GEOP
.
DN2DegreesCelsius_fastforward
(
inArray
,
OFF
,
GAI
,
K1
,
K2
,
0.95
,
inFill
,
inZero
,
inSaturated
)
if
conv
==
'TOA_Ref'
:
self
.
MetaObj
.
ScaleFactor
=
CFG
.
usecase
.
scale_factor_TOARef
self
.
MetaObj
.
ScaleFactor
=
CFG
.
scale_factor_TOARef
elif
arr_desc
==
'Rad'
:
raise
NotImplementedError
(
"Conversion Rad to %s is currently not supported."
%
conv
)
...
...
@@ -349,16 +349,16 @@ class L1A_object(GMS_object):
'13 bands and it not clear for which bands the gains are provided.'
)
raise
NotImplementedError
(
"Conversion TOA_Ref to %s is currently not supported."
%
conv
)
else
:
# conv=='TOA_Ref'
if
self
.
MetaObj
.
ScaleFactor
!=
CFG
.
usecase
.
scale_factor_TOARef
:
res
=
self
.
rescale_array
(
inArray
,
CFG
.
usecase
.
scale_factor_TOARef
,
self
.
MetaObj
.
ScaleFactor
)
self
.
MetaObj
.
ScaleFactor
=
CFG
.
usecase
.
scale_factor_TOARef
if
self
.
MetaObj
.
ScaleFactor
!=
CFG
.
scale_factor_TOARef
:
res
=
self
.
rescale_array
(
inArray
,
CFG
.
scale_factor_TOARef
,
self
.
MetaObj
.
ScaleFactor
)
self
.
MetaObj
.
ScaleFactor
=
CFG
.
scale_factor_TOARef
self
.
log_for_fullArr_or_firstTile
(
'Rescaling Ref data to scaling factor %d.'
%
CFG
.
usecase
.
scale_factor_TOARef
)
'Rescaling Ref data to scaling factor %d.'
%
CFG
.
scale_factor_TOARef
)
else
:
res
=
inArray
self
.
log_for_fullArr_or_firstTile
(
'The input data already represents TOA '
'reflectance with the desired scale factor of %d.'
%
CFG
.
usecase
.
scale_factor_TOARef
)
%
CFG
.
scale_factor_TOARef
)
else
:
# arr_desc == 'Temp'
raise
NotImplementedError
(
"Conversion Temp to %s is currently not supported."
%
conv
)
...
...
@@ -390,8 +390,8 @@ class L1A_object(GMS_object):
self
.
update_spec_vals_according_to_dtype
(
'int16'
)
tiles_desc
=
'_'
.
join
([
desc
for
op_th
,
desc
in
zip
([
'optical'
,
'thermal'
],
[
CFG
.
usecase
.
conversion_type_optical
,
CFG
.
usecase
.
conversion_type_thermal
])
[
CFG
.
conversion_type_optical
,
CFG
.
conversion_type_thermal
])
if
desc
in
self
.
dict_LayerOptTherm
.
values
()])
self
.
arr
=
dataOut
...
...
@@ -452,12 +452,12 @@ class L1A_object(GMS_object):
dst_CS_datum
=
'WGS84'
,
mode
=
'GDAL'
,
use_workspace
=
True
,
inFill
=
self
.
MetaObj
.
spec_vals
[
'fill'
])
if
CFG
.
job
.
exec_mode
==
'Python'
:
if
CFG
.
exec_mode
==
'Python'
:
path_warped
=
os
.
path
.
join
(
self
.
ExtractedFolder
,
self
.
baseN
+
'__'
+
self
.
arr_desc
)
GEOP
.
ndarray2gdal
(
rasObj
.
tondarray
(
direction
=
3
),
path_warped
,
importFile
=
rasObj
.
desc
,
direction
=
3
)
self
.
MetaObj
.
Dataname
=
path_warped
self
.
arr
=
path_warped
else
:
# CFG.
job.
exec_mode=='Flink':
else
:
# CFG.exec_mode=='Flink':
self
.
arr
=
rasObj
.
tondarray
(
direction
=
3
)
self
.
shape_fullArr
=
[
rasObj
.
rows
,
rasObj
.
cols
,
rasObj
.
bands
]
...
...
@@ -473,7 +473,7 @@ class L1A_object(GMS_object):
self
.
MetaObj
.
CornerTieP_UTM
=
rasObj
.
get_corner_coordinates
(
'UTM'
)
self
.
meta_odict
=
self
.
MetaObj
.
to_odict
()
# important in order to keep geotransform/projection
if
CFG
.
job
.
exec_mode
==
'Flink'
:
if
CFG
.
exec_mode
==
'Flink'
:
self
.
delete_tempFiles
()
# these files are needed later in Python execution mode
self
.
MetaObj
.
Dataname
=
previous_dataname
# /vsi.. pointing directly to raw data archive (which exists)
...
...
gms_preprocessing/algorithms/L1B_P.py
View file @
e5522a45
...
...
@@ -76,7 +76,7 @@ class Scene_finder(object):
SpIM
=
SpatialIndexMediator
(
timeout
=
timeout
)
self
.
possib_ref_scenes
=
\
SpIM
.
getFullSceneDataForDataset
(
self
.
boundsLonLat
,
self
.
timeStart
,
self
.
timeEnd
,
self
.
min_cloudcov
,
self
.
max_cloudcov
,
CFG
.
usecase
.
datasetid_spatial_ref
,
self
.
max_cloudcov
,
CFG
.
datasetid_spatial_ref
,
refDate
=
self
.
src_AcqDate
,
maxDaysDelta
=
self
.
plusminus_days
)
break
except
socket
.
timeout
:
...
...
@@ -116,7 +116,7 @@ class Scene_finder(object):
# get processing level of reference scenes
procL
=
GeoDataFrame
(
DB_T
.
get_info_from_postgreSQLdb
(
CFG
.
job
.
conn_database
,
'scenes_proc'
,
[
'sceneid'
,
'proc_level'
],
DB_T
.
get_info_from_postgreSQLdb
(
CFG
.
conn_database
,
'scenes_proc'
,
[
'sceneid'
,
'proc_level'
],
{
'sceneid'
:
list
(
GDF
.
sceneid
)}),
columns
=
[
'sceneid'
,
'proc_level'
])
GDF
=
GDF
.
merge
(
procL
,
on
=
'sceneid'
,
how
=
'left'
)
GDF
=
GDF
.
where
(
GDF
.
notnull
(),
None
)
# replace NaN values with None
...
...
@@ -129,7 +129,7 @@ class Scene_finder(object):
GDF
[
'refDs_exists'
]
=
list
(
GDF
[
'path_ref'
].
map
(
lambda
p
:
os
.
path
.
exists
(
p
)
if
p
else
False
))
# check if a proper entity ID can be gathered from database
eID
=
GeoDataFrame
(
DB_T
.
get_info_from_postgreSQLdb
(
CFG
.
job
.
conn_database
,
'scenes'
,
[
'id'
,
'entityid'
],
eID
=
GeoDataFrame
(
DB_T
.
get_info_from_postgreSQLdb
(
CFG
.
conn_database
,
'scenes'
,
[
'id'
,
'entityid'
],
{
'id'
:
list
(
GDF
.
sceneid
)}),
columns
=
[
'sceneid'
,
'entityid'
])
GDF
=
GDF
.
merge
(
eID
,
on
=
'sceneid'
,
how
=
'left'
)
self
.
GDF_ref_scenes
=
GDF
.
where
(
GDF
.
notnull
(),
None
)
...
...
@@ -284,7 +284,7 @@ class L1B_object(L1A_object):
plusminus_days
=
30
AcqDate
=
self
.
im2shift_objDict
[
'acquisition_date'
]
date_minmax
=
[
AcqDate
-
timedelta
(
days
=
plusminus_days
),
AcqDate
+
timedelta
(
days
=
plusminus_days
)]
dataset_cond
=
'datasetid=%s'
%
CFG
.
usecase
.
datasetid_spatial_ref
dataset_cond
=
'datasetid=%s'
%
CFG
.
datasetid_spatial_ref
cloudcov_cond
=
'cloudcover < %s'
%
max_cloudcov
# FIXME cloudcover noch nicht für alle scenes im proc_level METADATA verfügbar
dayrange_cond
=
"(EXTRACT(MONTH FROM scenes.acquisitiondate), EXTRACT(DAY FROM scenes.acquisitiondate)) "
\
...
...
@@ -294,7 +294,7 @@ class L1B_object(L1A_object):
def
query_scenes
(
condlist
):
return
DB_T
.
get_overlapping_scenes_from_postgreSQLdb
(
CFG
.
job
.
conn_database
,
CFG
.
conn_database
,
table
=
'scenes'
,
tgt_corners_lonlat
=
self
.
trueDataCornerLonLat
,
conditions
=
condlist
,
...
...
@@ -311,9 +311,9 @@ class L1B_object(L1A_object):
# das ist nur Ergebnis aus scenes_proc
# -> dort liegt nur eine referenz, wenn die szene schon bei CFG.job-Beginn in Datensatzliste drin war
res
=
DB_T
.
get_overlapping_scenes_from_postgreSQLdb
(
CFG
.
job
.
conn_database
,
CFG
.
conn_database
,
tgt_corners_lonlat
=
self
.
trueDataCornerLonLat
,
conditions
=
[
'datasetid=%s'
%
CFG
.
usecase
.
datasetid_spatial_ref
],
conditions
=
[
'datasetid=%s'
%
CFG
.
datasetid_spatial_ref
],
add_cmds
=
'ORDER BY scenes.cloudcover ASC'
,
timeout
=
25000
)
filt_overlap_scenes
=
self
.
_sceneIDList_to_filt_overlap_scenes
([
i
[
0
]
for
i
in
res
[:
50
]],
20.
)
...
...
@@ -354,8 +354,8 @@ class L1B_object(L1A_object):
# start download of scene data not available and start L1A processing
def
dl_cmd
(
scene_ID
):
print
(
'%s %s %s'
%
(
CFG
.
job
.
java_commands
[
'keyword'
].
strip
(),
# FIXME CFG.
job.
java_commands is deprecated
CFG
.
job
.
java_commands
[
"value_download"
].
strip
(),
scene_ID
))
CFG
.
java_commands
[
'keyword'
].
strip
(),
# FIXME CFG.java_commands is deprecated
CFG
.
java_commands
[
"value_download"
].
strip
(),
scene_ID
))
path
=
PG
.
path_generator
(
scene_ID
=
sc
[
'scene_ID'
]).
get_path_imagedata
()
...
...
@@ -369,12 +369,12 @@ class L1B_object(L1A_object):
# check if scene is downloading
download_start_timeout
=
5
# seconds
# set timout for external processing
# -> DEPRECATED BECAUSE CREATION OF EXTERNAL CFG
.job
WITHIN CFG
.job
IS NOT ALLOWED
# -> DEPRECATED BECAUSE CREATION OF EXTERNAL CFG WITHIN CFG IS NOT ALLOWED
processing_timeout
=
5
# seconds # FIXME increase timeout if processing is really started
proc_level
=
None
while
True
:
proc_level_chk
=
DB_T
.
get_info_from_postgreSQLdb
(
CFG
.
job
.
conn_database
,
'scenes'
,
[
'proc_level'
],
{
'id'
:
sc
[
'scene_ID'
]})[
0
][
0
]
CFG
.
conn_database
,
'scenes'
,
[
'proc_level'
],
{
'id'
:
sc
[
'scene_ID'
]})[
0
][
0
]
if
proc_level
!=
proc_level_chk
:
print
(
'Reference scene %s, current processing level: %s'
%
(
sc
[
'scene_ID'
],
proc_level_chk
))
proc_level
=
proc_level_chk
...
...
@@ -391,7 +391,7 @@ class L1B_object(L1A_object):
warnings
.
warn
(
'L1A processing of reference scene %s (entity ID %s) timed out. '
'Coregistration of this scene failed.'
%
(
self
.
baseN
,
self
.
scene_ID
))
break
# DB_T.set_info_in_postgreSQLdb(CFG.
job.
conn_database,'scenes',
# DB_T.set_info_in_postgreSQLdb(CFG.conn_database,'scenes',
# {'proc_level':'METADATA'},{'id':sc['scene_ID']})
time
.
sleep
(
5
)
...
...
@@ -408,7 +408,7 @@ class L1B_object(L1A_object):
self
.
overlap_percentage
=
sc
[
'overlap percentage'
]
self
.
overlap_area
=
sc
[
'overlap area'
]
query_res
=
DB_T
.
get_info_from_postgreSQLdb
(
CFG
.
job
.
conn_database
,
'scenes'
,
[
'entityid'
],
query_res
=
DB_T
.
get_info_from_postgreSQLdb
(
CFG
.
conn_database
,
'scenes'
,
[
'entityid'
],
{
'id'
:
self
.
imref_scene_ID
},
records2fetch
=
1
)
assert
query_res
!=
[],
'No entity-ID found for scene number %s'
%
self
.
imref_scene_ID
self
.
imref_entity_ID
=
query_res
[
0
][
0
]
# [('LC81510322013152LGN00',)]
...
...
@@ -504,7 +504,7 @@ class L1B_object(L1A_object):
if
spatIdxSrv_status
==
'unavailable'
:
self
.
logger
.
warning
(
'Coregistration skipped due to unavailable Spatial Index Mediator Server!"'
)
elif
CFG
.
job
.
skip_coreg
:
elif
CFG
.
skip_coreg
:
self
.
logger
.
warning
(
'Coregistration skipped according to user configuration.'
)
elif
self
.
coreg_needed
and
self
.
spatRef_available
:
...
...
@@ -604,7 +604,7 @@ class L1B_object(L1A_object):
if
self
.
coreg_info
[
'success'
]:
self
.
logger
.
info
(
"Correcting spatial shifts for attribute '%s'..."
%
attrname
)
elif
cliptoextent
and
is_coord_grid_equal
(
geoArr
.
gt
,
CFG
.
usecase
.
spatial_ref_gridx
,
CFG
.
usecase
.
spatial_ref_gridy
):
geoArr
.
gt
,
CFG
.
spatial_ref_gridx
,
CFG
.
spatial_ref_gridy
):
self
.
logger
.
info
(
"Attribute '%s' has only been clipped to it's extent because no valid "
"shifts have been detected and the pixel grid equals the target grid."
%
attrname
)
...
...
@@ -615,12 +615,12 @@ class L1B_object(L1A_object):
# correct shifts
DS
=
DESHIFTER
(
geoArr
,
self
.
coreg_info
,
target_xyGrid
=
[
CFG
.
usecase
.
spatial_ref_gridx
,
CFG
.
usecase
.
spatial_ref_gridy
],
target_xyGrid
=
[
CFG
.
spatial_ref_gridx
,
CFG
.
spatial_ref_gridy
],
cliptoextent
=
cliptoextent
,
clipextent
=
mapBounds
,
align_grids
=
True
,
resamp_alg
=
'nearest'
if
attrname
==
'masks'
else
'cubic'
,
CPUs
=
None
if
CFG
.
job
.
allow_subMultiprocessing
else
1
,
CPUs
=
None
if
CFG
.
allow_subMultiprocessing
else
1
,
progress
=
True
if
v
else
False
,
q
=
True
,
v
=
v
)
...
...
gms_preprocessing/algorithms/L1C_P.py
View file @
e5522a45
...
...
@@ -148,8 +148,8 @@ class L1C_object(L1B_object):
meshwidth
=
10
,
nodata_mask
=
None
,
# dont overwrite areas outside the image with nodata
outFill
=
get_outFillZeroSaturated
(
np
.
float32
)[
0
],
accurracy
=
CFG
.
usecase
.
SZA_SAA_calculation_accurracy
,
lonlat_arr
=
self
.
lonlat_arr
if
CFG
.
usecase
.
SZA_SAA_calculation_accurracy
==
'fine'
else
None
)
accurracy
=
CFG
.
SZA_SAA_calculation_accurracy
,
lonlat_arr
=
self
.
lonlat_arr
if
CFG
.
SZA_SAA_calculation_accurracy
==
'fine'
else
None
)
return
self
.
_SZA_arr
@
SZA_arr
.
setter
...
...
@@ -259,7 +259,7 @@ class AtmCorr(object):
path_logfile
=
inObj
.
pathGen
.
get_path_logfile
()
fileHandler
=
logging
.
FileHandler
(
path_logfile
,
mode
=
'a'
)
fileHandler
.
setFormatter
(
logger_atmCorr
.
formatter_fileH
)
fileHandler
.
setLevel
(
CFG
.
job
.
log_level
)
fileHandler
.
setLevel
(
CFG
.
log_level
)
logger_atmCorr
.
addHandler
(
fileHandler
)
...
...
@@ -662,7 +662,7 @@ class AtmCorr(object):
# compute cloud mask if not already provided
if
no_avail_CMs
:
algorithm
=
CFG
.
usecase
.
cloud_masking_algorithm
[
self
.
inObjs
[
0
].
satellite
]
algorithm
=
CFG
.
cloud_masking_algorithm
[
self
.
inObjs
[
0
].
satellite
]
if
algorithm
==
'SICOR'
:
return
None
...
...
@@ -672,7 +672,7 @@ class AtmCorr(object):
try
:
from
.cloud_masking
import
Cloud_Mask_Creator
CMC
=
Cloud_Mask_Creator
(
self
.
inObjs
[
0
],
algorithm
=
algorithm
,
tempdir_root
=
CFG
.
job
.
path_tempdir
)
CMC
=
Cloud_Mask_Creator
(
self
.
inObjs
[
0
],
algorithm
=
algorithm
,
tempdir_root
=
CFG
.
path_tempdir
)
CMC
.
calc_cloud_mask
()
cm_geoarray
=
CMC
.
cloud_mask_geoarray
cm_array
=
CMC
.
cloud_mask_array
...
...
@@ -742,7 +742,7 @@ class AtmCorr(object):
t0
=
time
()
results
=
download_variables
(
date_from
=
self
.
inObjs
[
0
].
acq_datetime
,
date_to
=
self
.
inObjs
[
0
].
acq_datetime
,
db_path
=
CFG
.
job
.
path_ECMWF_db
,
db_path
=
CFG
.
path_ECMWF_db
,
max_step
=
120
,
# default
ecmwf_variables
=
default_products
,
processes
=
0
,
# singleprocessing
...
...
@@ -792,7 +792,7 @@ class AtmCorr(object):
script
=
False
# check if ECMWF data are available - if not, start the download
if
CFG
.
usecase
.
auto_download_ecmwf
:
if
CFG
.
auto_download_ecmwf
:
self
.
_check_or_download_ECMWF_data
()
# validate SNR
...
...
@@ -887,7 +887,7 @@ class AtmCorr(object):
# update metadata
inObj
.
arr_desc
=
'BOA_Ref'
inObj
.
MetaObj
.
bands
=
len
(
self
.
results
.
data_ac
)
inObj
.
MetaObj
.
PhysUnit
=
'BOA_Reflectance in [0-%d]'
%
CFG
.
usecase
.
scale_factor_BOARef
inObj
.
MetaObj
.
PhysUnit
=
'BOA_Reflectance in [0-%d]'
%
CFG
.
scale_factor_BOARef
inObj
.
MetaObj
.
LayerBandsAssignment
=
out_LBA
inObj
.
MetaObj
.
filter_layerdependent_metadata
()
inObj
.
meta_odict
=
inObj
.
MetaObj
.
to_odict
()
# actually auto-updated by getter
...
...
@@ -896,7 +896,7 @@ class AtmCorr(object):
# FIXME AC output nodata values = 0 -> new nodata areas but mask not updated
oF_refl
,
oZ_refl
,
oS_refl
=
get_outFillZeroSaturated
(
inObj
.
arr
.
dtype
)
surf_refl
=
np
.
dstack
((
self
.
results
.
data_ac
[
bandN
]
for
bandN
in
ac_bandNs
))
surf_refl
*=
CFG
.
usecase
.
scale_factor_BOARef
# scale using scale factor (output is float16)
surf_refl
*=
CFG
.
scale_factor_BOARef
# scale using scale factor (output is float16)
# FIXME really set AC nodata values to GMS outZero?
surf_refl
[
nodata
]
=
oZ_refl
# overwrite AC nodata values with GMS outZero
# apply the original nodata mask (indicating background values)
...
...
@@ -927,8 +927,8 @@ class AtmCorr(object):
ac_bandNs
=
[
bandN
for
bandN
in
inObj
.
arr
.
bandnames
if
bandN
in
self
.
results
.
data_ac
.
keys
()]
ac_errors
=
np
.
dstack
((
self
.
results
.
data_errors
[
bandN
]
for
bandN
in
ac_bandNs
))
ac_errors
*=
CFG
.
usecase
.
scale_factor_errors_ac
# scale using scale factor (output is float16)
out_dtype
=
np
.
int8
if
CFG
.
usecase
.
scale_factor_errors_ac
<=
255
else
np
.
int16
ac_errors
*=
CFG
.
scale_factor_errors_ac
# scale using scale factor (output is float16)
out_dtype
=
np
.
int8
if
CFG
.
scale_factor_errors_ac
<=
255
else
np
.
int16
ac_errors
[
nodata
]
=
get_outFillZeroSaturated
(
out_dtype
)[
0
]
ac_errors
=
ac_errors
.
astype
(
out_dtype
)
inObj
.
ac_errors
=
ac_errors
# setter generates a GeoArray with the same bandnames like inObj.arr
...
...
@@ -989,8 +989,8 @@ class AtmCorr(object):
if
self
.
results
.
mask_clouds
.
mask_confidence_array
is
not
None
:
cfd_arr
=
self
.
results
.
mask_clouds
.
mask_confidence_array
# float32 2D array, scaled [0-1, nodata 255]
cfd_arr
[
cfd_arr
==
self
.
ac_input
[
'options'
][
'cld_mask'
][
'nodata_value_mask'
]]
=
-
1
cfd_arr
=
(
cfd_arr
*
CFG
.
usecase
.
scale_factor_BOARef
).
astype
(
np
.
int16
)
cfd_arr
[
cfd_arr
==
-
CFG
.
usecase
.
scale_factor_BOARef
]
=
get_outFillZeroSaturated
(
cfd_arr
.
dtype
)[
0
]
cfd_arr
=
(
cfd_arr
*
CFG
.
scale_factor_BOARef
).
astype
(
np
.
int16
)
cfd_arr
[
cfd_arr
==
-
CFG
.
scale_factor_BOARef
]
=
get_outFillZeroSaturated
(
cfd_arr
.
dtype
)[
0
]
joined
=
False
for
inObj
in
self
.
inObjs
:
...
...
gms_preprocessing/algorithms/L2B_P.py
View file @
e5522a45
...
...
@@ -41,7 +41,7 @@ class L2B_object(L2A_object):
def
spectral_homogenization
(
self
,
kind
=
'linear'
):
src_cwls
=
self
.
meta_odict
[
'wavelength'
]
# FIXME exclude or include thermal bands; respect sorted CWLs in context of LayerBandsAssignment
tgt_cwls
=
CFG
.
usecase
.
target_CWL
tgt_cwls
=
CFG
.
target_CWL
if
src_cwls
!=
tgt_cwls
:
assert
kind
in
[
'linear'
,
],
"%s is not a supported kind of homogenization."
%
kind
self
.
log_for_fullArr_or_firstTile
(
...
...
gms_preprocessing/algorithms/cloud_masking.py
View file @
e5522a45
...
...
@@ -71,7 +71,7 @@ class _FMASK_Runner(object):
def
is_GMSConfig_available
(
self
):
from
..config
import
GMS_config
as
CFG
try
:
if
CFG
.
job
is
not
None
:
if
CFG
is
not
None
:
return
True
except
(
EnvironmentError
,
OSError
):
return
False
...
...
@@ -350,7 +350,7 @@ class FMASK_Runner_Sentinel2(_FMASK_Runner):
if
not
self
.
_granule_ID
and
self
.
scene_ID
and
self
.
scene_ID
!=
-
9999
and
self
.
is_GMSConfig_available
:
from
..config
import
GMS_config
as
CFG
res
=
get_info_from_postgreSQLdb
(
CFG
.
job
.
conn_database
,
'scenes'
,
[
'entityid'
],
{
'id'
:
self
.
scene_ID
})
res
=
get_info_from_postgreSQLdb
(
CFG
.
conn_database
,
'scenes'
,
[
'entityid'
],
{
'id'
:
self
.
scene_ID
})
assert
len
(
res
)
!=
0
,
\
"Invalid SceneID given - no corresponding scene with the ID=%s found in database.
\n
"
%
self
.
scene_ID
assert
len
(
res
)
==
1
,
"Error in database. The sceneid %s exists more than once.
\n
"
%
self
.
scene_ID
...
...
@@ -480,7 +480,7 @@ class Cloud_Mask_Creator(object):
self
.
GMS_obj
.
logger
.
info
(
"Calculating cloud mask based on '%s' algorithm..."
%
self
.
algorithm
)
if
self
.
algorithm
is
'FMASK'
:
if
self
.
algorithm
==
'FMASK'
:
if
re
.
search
(
'Landsat'
,
self
.
GMS_obj
.
satellite
,
re
.
I
):
FMR
=
FMASK_Runner_Landsat
(
self
.
GMS_obj
.
path_archive
,
self
.
GMS_obj
.
satellite
)
...
...
gms_preprocessing/algorithms/geoprocessing.py
View file @
e5522a45
...
...
@@ -100,7 +100,7 @@ class GEOPROCESSING(object):
self
.
shortname
,
self
.
extension
=
os
.
path
.
splitext
(
self
.
filename
)
# ****OBJECT ATTRIBUTES***************************************************
self
.
workspace
=
os
.
path
.
join
(
CFG
.
job
.
path_tempdir
,
'GEOPROCESSING_temp'
)
if
workspace
is
None
else
workspace
self
.
workspace
=
os
.
path
.
join
(
CFG
.
path_tempdir
,
'GEOPROCESSING_temp'
)
if
workspace
is
None
else
workspace
if
v
:
self
.
logger
.
debug
(
"
\n
--"
)
self
.
logger
.
debug
(
"
\t
temporary geoprocessing workspace"
,
self
.
workspace
)
...
...
@@ -379,7 +379,7 @@ class GEOPROCESSING(object):
# %(dst_EPSG_code, in_nodataVal,out_nodataVal, translatedFile, warpedFile))
os
.
system
(
'gdalwarp -of ENVI --config GDAL_CACHEMAX 2048 -wm 2048 -t_srs EPSG:%s -tps -r
\
cubic -srcnodata %s -dstnodata %s -multi -overwrite -wo NUM_THREADS=%s -q %s %s'
%
(
dst_EPSG_code
,
inFill
,
out_nodataVal
,
CFG
.
job
.
CPUs
,
translatedFile
,
warpedFile
))
%
(
dst_EPSG_code
,
inFill
,
out_nodataVal
,
CFG
.
CPUs
,
translatedFile
,
warpedFile
))
# import shutil
# only for bugfixing:
# shutil.copy(translatedFile, \
...
...
@@ -405,7 +405,7 @@ class GEOPROCESSING(object):
# %(dst_EPSG_code,in_nodataVal,out_nodataVal,translatedFile,warpedFile))
os
.
system
(
'gdalwarp -of VRT --config GDAL_CACHEMAX 2048 -wm 2048 -ot Int16 -t_srs EPSG:%s -tps -r
\
cubic -srcnodata %s -dstnodata %s -overwrite -multi -wo NUM_THREADS=%s -q %s %s'
%
(
dst_EPSG_code
,
inFill
,
out_nodataVal
,
CFG
.
job
.
CPUs
,
translatedFile
,
warpedFile
))
%
(
dst_EPSG_code
,
inFill
,
out_nodataVal
,
CFG
.
CPUs
,
translatedFile
,
warpedFile
))
# print('warped')
print
(
'GDAL warping time'
,
time
.
time
()
-
t0
)
...
...
gms_preprocessing/config.py
View file @
e5522a45
...
...
@@ -16,61 +16,25 @@ import json
from
jsmin
import
jsmin
from
cerberus
import
Validator
import
pkgutil
from
typing
import
TYPE_CHECKING
,
Dict
from
typing
import
TYPE_CHECKING
if
TYPE_CHECKING
:
from
.misc.database_tools
import
GMS_JOB
from
.misc.database_tools
import
GMS_JOB
# noqa F401 # flake8 issue
__author__
=
'Daniel Scheffler'
def
set_configOLD
(
job_ID
,
exec_mode
=
'Python'
,
db_host
=
'localhost'
,
reset
=
False
,
job_kwargs
=
None
):
# type: (int, str, str, bool, dict) -> None
"""Set up a configuration for a new gms_preprocessing job!
:param job_ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param exec_mode: 'Python': writes intermediate results to disk in order to save memory
'Flink': keeps intermediate results in memory in order to save IO time
:param db_host: host name of the server that runs the postgreSQL database
:param reset: whether to reset the job status or not (default=False)
:param job_kwargs: keyword arguments to be passed to gms_preprocessing.config.Job() (see documentation there)
"""
if
not
hasattr
(
builtins
,
'GMS_job'
)
or
not
hasattr
(
builtins
,
'GMS_usecase'
)
or
reset
:
job_kwargs
=
job_kwargs
if
job_kwargs
else
{}
builtins
.
GMS_job
=
Job
(
job_ID
,
exec_mode
=
exec_mode
,
db_host
=
db_host
,
**
job_kwargs
)
builtins
.
GMS_usecase
=
Usecase
(
getattr
(
builtins
,
'GMS_job'
))
class
GMS_configuration
(
object
):
@
property
def
job
(
self
):
if
hasattr
(
builtins
,
'GMS_job'
):
return
getattr
(
builtins
,
'GMS_job'
)
def
__getattr__
(
self
,
attr
):
if
hasattr
(
builtins
,
'GMS_JobConfig'
):
if
attr
in
[
'job'
,
'usecase'
]:
# This is only to keep compatibility with HU-INF codes
return
getattr
(
builtins
,
'GMS_JobConfig'
)
return
getattr
(
builtins
.
GMS_JobConfig
,
attr
)
else
:
raise
EnvironmentError
(
"Config has not been set already on this machine. Run 'set_config()' first!'"
)
@
job
.
setter
def
job
(
self
,
job_obj
):
assert
isinstance
(
job_obj
,
Job
),
\
"GMS_configuration.job can only be set to an instance of the class 'config.Job'."
builtins
.
GMS_job
=
job_obj
@
property
def
usecase
(
self
):
if
hasattr
(
builtins
,
'GMS_usecase'
):
return
getattr
(
builtins
,
'GMS_usecase'
)
else
:
raise
EnvironmentError
(
"Config has not been set already on this machine. Run 'set_config()' first!'"
)
@
usecase
.
setter
def
usecase
(
self
,
usecase_obj
):
assert
isinstance
(
usecase_obj
,
Usecase
),
\
"GMS_configuration.usecase can only be set to an instance of the class 'config.Usecase'."
builtins
.
GMS_usecase
=
usecase_obj
GMS_config
=
GMS_configuration
()
...
...
@@ -288,7 +252,7 @@ class JobConfig(object):
self
.
exec_mode
=
\
gp
(
'exec_mode'
,
json_opts
[
'exec_mode'
])
self
.
CPUs
=
\
gp
(
'CPUs'
,
json_opts
[
'
exec_mode
'
],
fallback
=
multiprocessing
.
cpu_count
())
gp
(
'CPUs'
,
json_opts
[
'
CPUs
'
],
fallback
=
multiprocessing
.
cpu_count
())
self
.
allow_subMultiprocessing
=
\
gp
(
'allow_subMultiprocessing'
,
json_opts
[
'allow_subMultiprocessing'
])
self
.
disable_exception_handler
=
\
...
...
@@ -459,6 +423,12 @@ class JobConfig(object):
self
.
spatial_ref_gridx
=
np
.
arange
(
xgsd
/
2.
,
xgsd
/
2.
+
2
*
xgsd
,
xgsd
)
# e.g. [15, 45]
self
.
spatial_ref_gridy
=
np
.
arange
(
ygsd
/
2.
,
ygsd
/
2.
+
2
*
ygsd
,
ygsd
)
#############
# data list #
#############
self
.
data_list
=
self
.
get_data_list_of_current_jobID
()
@
property
def
user_opts_defaults
(
self
):
if
not
self
.
_user_opts_defaults
:
...
...
@@ -490,7 +460,7 @@ class JobConfig(object):
# type: () -> GMS_JOB
if
not
self
.
_DB_job_record
:
# check if job ID exists in database
from
.misc.database_tools
import
GMS_JOB
from
.misc.database_tools
import
GMS_JOB
# noqa F811 # redefinition of unused 'GMS_JOB' from line 22
try
:
self
.
_DB_job_record
=
GMS_JOB
(
self
.
conn_database
).
from_job_ID
(
self
.
ID
)
except
ValueError
:
...
...
@@ -544,8 +514,9 @@ class JobConfig(object):
'options_default.json'
),
validation
=
validate
)
# update default options with those from DB
db_options
=
json_to_python
(
json
.
loads
(
jsmin
(
self
.
DB_job_record
.
analysis_parameter
)))
# type: dict
default_options
.
update
(
db_options
)
if
self
.
DB_job_record
.
analysis_parameter
:
db_options
=
json_to_python
(
json
.
loads
(
jsmin
(
self
.
DB_job_record
.
analysis_parameter
)))
# type: dict
default_options
.
update
(
db_options
)
if
validate
:
GMSValidator
().
validate
(
default_options
)
...
...
@@ -575,6 +546,82 @@ class JobConfig(object):
return
{
'args'
:
{
k
:
v
for
k
,
v
in
argskwargs
.
items
()
if
k
in
argsnames
},
'kwargs'
:
{
k
:
v
for
k
,
v
in
argskwargs
.
items
()
if
k
not
in
argsnames
}}
def
get_data_list_of_current_jobID
(
self
):
"""
Get a list of datasets to be processed from database and return it together with some metadata.
:return: <list> of OrderedDicts, e.g. [OrderedDict([('proc_level', None), ('scene_ID', 5895940),
('datasetid', 104), ('image_type', 'RSD'), ('satellite', 'Landsat-8'), ('sensor', 'OLI_TIRS'),
('subsystem', ''), ('acquisition_date', datetime.datetime(2015, 2, 5, 10, 2, 52)),
('entity_ID', 'LC81930242015036LGN00'), ('filename', 'LC81930242015036LGN00.tar.gz'),
('sensormode', 'M'), ('logger', None)]), ...]
"""
from
.model.metadata
import
get_sensormode
data_list
=
[]
with
psycopg2
.
connect
(
self
.
conn_database
)
as
conn
:
with
conn
.
cursor
(
cursor_factory
=
psycopg2
.
extras
.
DictCursor
)
as
cur
:
cur
.
execute
(
"""
WITH jobs_unnested AS (
SELECT id, unnest(sceneids) AS sceneid FROM jobs
)
SELECT jobs_unnested.sceneid,
scenes.datasetid,
scenes.acquisitiondate,
scenes.entityid,
scenes.filename,
COALESCE(scenes_proc.proc_level::text, 'L1A') AS proc_level,
datasets.image_type,
satellites.name AS satellite,
sensors.name AS sensor,
subsystems.name AS subsystem
FROM jobs_unnested
LEFT OUTER JOIN scenes ON scenes.id = jobs_unnested.sceneid
LEFT OUTER JOIN scenes_proc ON scenes_proc.sceneid = jobs_unnested.sceneid
LEFT OUTER JOIN datasets ON datasets.id = datasetid
LEFT OUTER JOIN satellites ON satellites.id = satelliteid
LEFT OUTER JOIN sensors ON sensors.id = sensorid
LEFT OUTER JOIN subsystems ON subsystems.id = subsystemid
WHERE jobs_unnested.id = %s
"""
,
(
self
.
ID
,))
for
row
in
cur
.
fetchall
():
ds
=
OrderedDict
()
ds
[
"proc_level"
]
=
row
[
"proc_level"
]
ds
[
"scene_ID"
]
=
row
[
"sceneid"
]
ds
[
"dataset_ID"
]
=
row
[
"datasetid"
]
ds
[
"image_type"
]
=
row
[
"image_type"
]
ds
[
"satellite"
]
=
row
[
"satellite"
]
ds
[
"sensor"
]
=
row
[
"sensor"
]
ds
[
"subsystem"
]
=
row
[
"subsystem"
]