Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
geomultisens
gms_preprocessing
Commits
ba661306
Commit
ba661306
authored
Nov 23, 2017
by
Daniel Scheffler
Browse files
Removed deprecated config.Job.call_type parameter.
Former-commit-id:
74cad5e3
Former-commit-id:
068e26cb
parent
7ad99c80
Changes
9
Hide whitespace changes
Inline
Side-by-side
gms_preprocessing/config.py
View file @
ba661306
...
...
@@ -6,9 +6,7 @@ import os
import
warnings
import
numpy
as
np
import
builtins
import
glob
import
re
import
sys
import
psycopg2
import
psycopg2.extras
from
collections
import
OrderedDict
...
...
@@ -21,12 +19,11 @@ from cerberus import Validator
__author__
=
'Daniel Scheffler'
def
set_config
(
call_type
,
job_ID
,
exec_mode
=
'Python'
,
db_host
=
'localhost'
,
reset
=
False
,
job_kwargs
=
None
):
# type: (
str,
int, str, str, bool, dict) -> None
def
set_config
(
job_ID
,
exec_mode
=
'Python'
,
db_host
=
'localhost'
,
reset
=
False
,
job_kwargs
=
None
):
# type: (int, str, str, bool, dict) -> None
"""Set up a configuration for a new gms_preprocessing job!
:param call_type: 'console' or 'webapp'
:param job_ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param exec_mode: 'Python': writes intermediate results to disk in order to save memory
'Flink': keeps intermediate results in memory in order to save IO time
...
...
@@ -37,7 +34,7 @@ def set_config(call_type, job_ID, exec_mode='Python', db_host='localhost', reset
if
not
hasattr
(
builtins
,
'GMS_job'
)
or
not
hasattr
(
builtins
,
'GMS_usecase'
)
or
reset
:
job_kwargs
=
job_kwargs
if
job_kwargs
else
{}
builtins
.
GMS_job
=
Job
(
call_type
,
job_ID
,
exec_mode
=
exec_mode
,
db_host
=
db_host
,
**
job_kwargs
)
builtins
.
GMS_job
=
Job
(
job_ID
,
exec_mode
=
exec_mode
,
db_host
=
db_host
,
**
job_kwargs
)
builtins
.
GMS_usecase
=
Usecase
(
getattr
(
builtins
,
'GMS_job'
))
...
...
@@ -73,7 +70,7 @@ GMS_config = GMS_configuration()
class
Job
(
object
):
def
__init__
(
self
,
call_type
,
ID
,
exec_mode
=
'Python'
,
db_host
=
'localhost'
,
exec_L1AP
=
None
,
exec_L1BP
=
None
,
def
__init__
(
self
,
ID
,
exec_mode
=
'Python'
,
db_host
=
'localhost'
,
exec_L1AP
=
None
,
exec_L1BP
=
None
,
exec_L1CP
=
None
,
exec_L2AP
=
None
,
exec_L2BP
=
None
,
exec_L2CP
=
None
,
CPUs
=
None
,
allow_subMultiprocessing
=
True
,
disable_exception_handler
=
True
,
log_level
=
'INFO'
,
tiling_block_size_XY
=
(
2048
,
2048
),
is_test
=
False
,
profiling
=
False
,
benchmark_global
=
False
,
...
...
@@ -81,7 +78,6 @@ class Job(object):
"""Create a job configuration
:param call_type: 'console' or 'webapp'
:param ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param exec_mode: 'Python': writes intermediate results to disk in order to save memory
'Flink': keeps intermediate results in memory in order to save IO time
...
...
@@ -120,7 +116,6 @@ class Job(object):
# args
self
.
ID
=
ID
self
.
call_type
=
call_type
# FIXME deprecated
self
.
exec_mode
=
exec_mode
assert
exec_mode
in
[
'Flink'
,
'Python'
]
self
.
db_host
=
db_host
...
...
@@ -221,8 +216,7 @@ class Job(object):
@
property
def
conn_database
(
self
):
return
"dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=3"
%
self
.
db_host
\
if
self
.
call_type
==
'webapp'
else
''
return
"dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=3"
%
self
.
db_host
def
get_init_argskwargs
(
self
,
ignore
=
(
"logger"
,)):
"""
...
...
@@ -304,69 +298,39 @@ class Usecase:
def
query_vir
(
col
,
VSID
):
return
get_info_from_postgreSQLdb
(
_job
.
conn_database
,
'virtual_sensors'
,
col
,
{
'id'
:
VSID
})[
0
][
0
]
if
_job
.
call_type
==
'console'
:
self
.
filt_coord
=
[
None
,
None
,
None
,
None
]
# filt_datasets = ['ALOS', 'Terra', 'Landsat', 'SPOT', 'RapidEye', 'SRTM', 'ATM']
# filt_datasets = ['ALOS', 'Terra', 'SPOT', 'RapidEye', 'SRTM', 'ATM']
self
.
filt_datasets
=
[
'ALOS'
,
'Terra'
,
'Landsat'
,
'SPOT'
,
'RapidEye'
]
# filt_datasets = ['Terra']
# filt_datasets = ['Landsat']
# filt_datasets = ['ALOS']
# filt_datasets = ['SPOT']
# filt_datasets = ['RapidEye','ALOS']
# filt_datasets = ['RapidEye']
# filt_datasets = ['Landsat','SPOT','RapidEye']
# filt_datasets = ['Landsat','SPOT']
self
.
filt_date
=
[
2000
,
2015
]
# filt_date = [2012,2015]
self
.
skip_thermal
=
True
self
.
skip_pan
=
True
self
.
sort_bands_by_cwl
=
True
self
.
conversion_type_optical
=
'BOA_Ref'
# 'Rad' / 'TOA_Ref' / 'BOA_Ref'
self
.
conversion_type_thermal
=
'Rad'
# 'Rad' / 'Temp'
self
.
scale_factor_TOARef
=
10000
self
.
scale_factor_BOARef
=
10000
self
.
scale_factor_errors_ac
=
255
# self.virtual_sensor_id = 10 # Sentinel-2A 10m
self
.
virtual_sensor_id
=
1
# Landsat-8
self
.
datasetid_spectral_ref
=
249
# Sentinel-2A
self
.
target_CWL
=
[]
self
.
target_FWHM
=
[]
self
.
data_list
=
self
.
get_entity_IDs_within_AOI
()
elif
_job
.
call_type
==
'webapp'
:
def
query_job
(
col
):
return
get_info_from_postgreSQLdb
(
_job
.
conn_database
,
'jobs'
,
col
,
{
'id'
:
_job
.
ID
})[
0
][
0
]
# skip_thermal = int(query_cfg(_job.conn_database, 'skip_thermal'))
self
.
skip_thermal
=
True
self
.
skip_pan
=
int
(
query_cfg
(
'skip_pan'
))
self
.
sort_bands_by_cwl
=
int
(
query_cfg
(
'sort_bands_by_cwl'
))
self
.
conversion_type_optical
=
query_cfg
(
'conversion_type_optical'
)
self
.
conversion_type_thermal
=
query_cfg
(
'conversion_type_thermal'
)
self
.
virtual_sensor_id
=
query_job
(
'virtualsensorid'
)
self
.
virtual_sensor_id
=
self
.
virtual_sensor_id
if
self
.
virtual_sensor_id
!=
-
1
else
10
# Sentinel-2A 10m
self
.
virtual_sensor_name
=
query_vir
(
'name'
,
self
.
virtual_sensor_id
)
self
.
datasetid_spatial_ref
=
query_job
(
'datasetid_spatial_ref'
)
# self.datasetid_spatial_ref = 104
self
.
datasetid_spectral_ref
=
query_vir
(
'spectral_characteristics_datasetid'
,
self
.
virtual_sensor_id
)
# FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
self
.
target_CWL
=
query_vir
(
'wavelengths_pos'
,
self
.
virtual_sensor_id
)
# FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
self
.
target_FWHM
=
query_vir
(
'band_width'
,
self
.
virtual_sensor_id
)
# FIXME target GSD setting is a duplicate to datasetid_spatial_ref!
self
.
target_gsd
=
query_vir
(
'spatial_resolution'
,
self
.
virtual_sensor_id
)
# table features only 1 value for X/Y-dims
self
.
target_gsd
=
xgsd
,
ygsd
=
\
[
self
.
target_gsd
]
*
2
if
isinstance
(
self
.
target_gsd
,
(
int
,
float
))
else
self
.
target_gsd
self
.
EPSG
=
query_vir
(
'projection_epsg'
,
self
.
virtual_sensor_id
)
self
.
spatial_ref_gridx
=
np
.
arange
(
xgsd
/
2.
,
xgsd
/
2.
+
2
*
xgsd
,
xgsd
)
# e.g. [15, 45]
self
.
spatial_ref_gridy
=
np
.
arange
(
ygsd
/
2.
,
ygsd
/
2.
+
2
*
ygsd
,
ygsd
)
self
.
scale_factor_TOARef
=
int
(
query_cfg
(
'scale_factor_TOARef'
))
self
.
scale_factor_BOARef
=
int
(
query_cfg
(
'scale_factor_BOARef'
))
self
.
scale_factor_errors_ac
=
int
(
query_cfg
(
'scale_factor_errors_ac'
))
self
.
data_list
=
self
.
get_data_list_of_current_jobID
()
def
query_job
(
col
):
return
get_info_from_postgreSQLdb
(
_job
.
conn_database
,
'jobs'
,
col
,
{
'id'
:
_job
.
ID
})[
0
][
0
]
# skip_thermal = int(query_cfg(_job.conn_database, 'skip_thermal'))
self
.
skip_thermal
=
True
self
.
skip_pan
=
int
(
query_cfg
(
'skip_pan'
))
self
.
sort_bands_by_cwl
=
int
(
query_cfg
(
'sort_bands_by_cwl'
))
self
.
conversion_type_optical
=
query_cfg
(
'conversion_type_optical'
)
self
.
conversion_type_thermal
=
query_cfg
(
'conversion_type_thermal'
)
self
.
virtual_sensor_id
=
query_job
(
'virtualsensorid'
)
self
.
virtual_sensor_id
=
self
.
virtual_sensor_id
if
self
.
virtual_sensor_id
!=
-
1
else
10
# Sentinel-2A 10m
self
.
virtual_sensor_name
=
query_vir
(
'name'
,
self
.
virtual_sensor_id
)
self
.
datasetid_spatial_ref
=
query_job
(
'datasetid_spatial_ref'
)
# self.datasetid_spatial_ref = 104
self
.
datasetid_spectral_ref
=
query_vir
(
'spectral_characteristics_datasetid'
,
self
.
virtual_sensor_id
)
# FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
self
.
target_CWL
=
query_vir
(
'wavelengths_pos'
,
self
.
virtual_sensor_id
)
# FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
self
.
target_FWHM
=
query_vir
(
'band_width'
,
self
.
virtual_sensor_id
)
# FIXME target GSD setting is a duplicate to datasetid_spatial_ref!
self
.
target_gsd
=
query_vir
(
'spatial_resolution'
,
self
.
virtual_sensor_id
)
# table features only 1 value for X/Y-dims
self
.
target_gsd
=
xgsd
,
ygsd
=
\
[
self
.
target_gsd
]
*
2
if
isinstance
(
self
.
target_gsd
,
(
int
,
float
))
else
self
.
target_gsd
self
.
EPSG
=
query_vir
(
'projection_epsg'
,
self
.
virtual_sensor_id
)
self
.
spatial_ref_gridx
=
np
.
arange
(
xgsd
/
2.
,
xgsd
/
2.
+
2
*
xgsd
,
xgsd
)
# e.g. [15, 45]
self
.
spatial_ref_gridy
=
np
.
arange
(
ygsd
/
2.
,
ygsd
/
2.
+
2
*
ygsd
,
ygsd
)
self
.
scale_factor_TOARef
=
int
(
query_cfg
(
'scale_factor_TOARef'
))
self
.
scale_factor_BOARef
=
int
(
query_cfg
(
'scale_factor_BOARef'
))
self
.
scale_factor_errors_ac
=
int
(
query_cfg
(
'scale_factor_errors_ac'
))
self
.
data_list
=
self
.
get_data_list_of_current_jobID
()
self
.
align_coord_grids
=
True
# ONLY TO FORCE DEACTIVATION OF IMAGE RESAMPLING
self
.
match_gsd
=
True
...
...
@@ -376,110 +340,7 @@ class Usecase:
assert
self
.
conversion_type_optical
in
[
'Rad'
,
'TOA_Ref'
,
'BOA_Ref'
,
'Temp'
],
\
'Unsupported conversion type: %s'
%
self
.
conversion_type_optical
@
staticmethod
def
get_usecase_coord_grid
():
"""consider projections of images with status georef = master"""
geotransform
=
(
0
,
1
,
0
,
0
,
0
,
-
1
)
# FIXME
EPSG
=
'EPSG:4326'
# FIXME
GSD_meters
=
30
# default
return
geotransform
,
EPSG
,
GSD_meters
def
get_entity_IDs_within_AOI
(
self
):
# called in console mode
from
.model.metadata
import
LandsatID2dataset
,
get_sensormode
# parse cli arguments
sys
.
stderr
.
write
(
"No scene ids from CLI received. Using old data_list.
\n
"
)
data_list
=
[]
if
re
.
search
(
'ALOS'
,
','
.
join
(
self
.
filt_datasets
)):
# sensorname has to be in HLP_F.get_GMS_sensorcode
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'ALOS'
,
'sensor'
:
'AVNIR-2'
,
'subsystem'
:
None
,
'acq_datetime'
:
'2009-07-02'
,
'entity_ID'
:
'A1002553-001-P6100002-AODS-201007300008'
})
# TAR-ID 1B1
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'ALOS'
,
'sensor'
:
'AVNIR-2'
,
'subsystem'
:
None
,
'acq_datetime'
:
'2007-09-27'
,
'entity_ID'
:
'20070927_L1B2_ALAV2A089152780'
})
# extracted Folder 1B2
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'ALOS'
,
'sensor'
:
'AVNIR-2'
,
'subsystem'
:
None
,
'acq_datetime'
:
'2009-07-19'
,
'entity_ID'
:
'20090719_L1B2_ALAV2A185572780'
})
# extracted Folder 1B2
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'ALOS'
,
'sensor'
:
'AVNIR-2'
,
'subsystem'
:
None
,
'acq_datetime'
:
'2010-04-21'
,
'entity_ID'
:
'20100421_L1B2_ALAV2A225832780'
})
# extracted Folder 1B2
if
re
.
search
(
'Terra'
,
','
.
join
(
self
.
filt_datasets
)):
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'Terra'
,
'sensor'
:
'ASTER'
,
'subsystem'
:
'VNIR1'
,
'acq_datetime'
:
'2007-11-08'
,
'entity_ID'
:
'AST_L1B_00308192007061017_20071108171717_32444'
})
# HDF-ID
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'Terra'
,
'sensor'
:
'ASTER'
,
'subsystem'
:
'VNIR2'
,
'acq_datetime'
:
'2007-11-08'
,
'entity_ID'
:
'AST_L1B_00308192007061017_20071108171717_32444'
})
# HDF-ID
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'Terra'
,
'sensor'
:
'ASTER'
,
'subsystem'
:
'SWIR'
,
'acq_datetime'
:
'2007-11-08'
,
'entity_ID'
:
'AST_L1B_00308192007061017_20071108171717_32444'
})
# HDF-ID
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'Terra'
,
'sensor'
:
'ASTER'
,
'subsystem'
:
'TIR'
,
'acq_datetime'
:
'2007-11-08'
,
'entity_ID'
:
'AST_L1B_00308192007061017_20071108171717_32444'
})
# HDF-ID
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'Terra'
,
'sensor'
:
'ASTER'
,
'subsystem'
:
'VNIR1'
,
'acq_datetime'
:
'2002-06-08'
,
'entity_ID'
:
'AST_L1A_003_05262002060543_06082002144959'
})
# HDF-ID
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'Terra'
,
'sensor'
:
'ASTER'
,
'subsystem'
:
'VNIR2'
,
'acq_datetime'
:
'2002-06-08'
,
'entity_ID'
:
'AST_L1A_003_05262002060543_06082002144959'
})
# HDF-ID
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'Terra'
,
'sensor'
:
'ASTER'
,
'subsystem'
:
'SWIR'
,
'acq_datetime'
:
'2002-06-08'
,
'entity_ID'
:
'AST_L1A_003_05262002060543_06082002144959'
})
# HDF-ID
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'Terra'
,
'sensor'
:
'ASTER'
,
'subsystem'
:
'TIR'
,
'acq_datetime'
:
'2002-06-08'
,
'entity_ID'
:
'AST_L1A_003_05262002060543_06082002144959'
})
# HDF-ID
if
re
.
search
(
'Landsat'
,
','
.
join
(
self
.
filt_datasets
)):
# sensorname has to be in HLP_F.get_GMS_sensorcode
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'Landsat-5'
,
'sensor'
:
'TM'
,
'subsystem'
:
None
,
'acq_datetime'
:
'1996-10-24'
,
'entity_ID'
:
'LT51510321996298XXX01'
})
# TAR-ID
# data_list.append({'image_type':'RSD','satellite':'Landsat-7', 'sensor':'ETM+', 'subsystem':None,
# 'acq_datetime':'2002-08-15', 'entity_ID':'LE70050152002227EDC00'}) # TAR-ID
# data_list.append({'image_type':'RSD','satellite':'Landsat-7', 'sensor':'ETM+', 'subsystem':None,
# 'acq_datetime':'2000-04-02', 'entity_ID':'LE71510322000093SGS00'}) # TAR-ID
data_list
=
data_list
+
LandsatID2dataset
([
os
.
path
.
basename
(
i
).
split
(
'.tar.gz'
)[
0
]
for
i
in
glob
.
glob
(
os
.
path
.
join
(
self
.
_job
.
path_archive
,
'Landsat-7/ETM+/*.tar.gz'
))])
# TAR-ID
data_list
=
data_list
+
LandsatID2dataset
([
os
.
path
.
basename
(
i
).
split
(
'.tar.gz'
)[
0
]
for
i
in
glob
.
glob
(
os
.
path
.
join
(
self
.
_job
.
path_archive
,
'Landsat-8/OLI_TIRS/*.tar.gz'
))])
# TAR-ID
# data_list.append({'image_type':'RSD','satellite':'Landsat-8', 'sensor':'OLI_TIRS','subsystem':None,
# 'acq_datetime':'2013-07-03', 'entity_ID':'LC81510322013184LGN00'}) # TAR-ID
# data_list.append({'image_type':'RSD','satellite':'Landsat-8', 'sensor':'OLI_TIRS','subsystem':None,
# 'acq_datetime':'2013-06-01', 'entity_ID':'LC81510322013152LGN00'}) # TAR-ID ~6% Cloudcov
if
re
.
search
(
'SPOT'
,
','
.
join
(
self
.
filt_datasets
)):
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'SPOT-1'
,
'sensor'
:
'HRV1'
,
'subsystem'
:
None
,
'acq_datetime'
:
'1986-07-17'
,
'entity_ID'
:
'00197112001'
})
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'SPOT-5'
,
'sensor'
:
'HRG2'
,
'subsystem'
:
None
,
'acq_datetime'
:
'2010-04-21'
,
'entity_ID'
:
'00197112009'
})
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'SPOT-5'
,
'sensor'
:
'HRG2'
,
'subsystem'
:
None
,
'acq_datetime'
:
'2010-04-21'
,
'entity_ID'
:
'00197112010'
})
if
re
.
search
(
'RapidEye'
,
','
.
join
(
self
.
filt_datasets
)):
data_list
.
append
({
'image_type'
:
'RSD'
,
'satellite'
:
'RapidEye-5'
,
'sensor'
:
'MSI'
,
'subsystem'
:
None
,
'acq_datetime'
:
'2014-04-23'
,
'entity_ID'
:
'4357606_2014-04-23_RE5_3A_180259'
})
if
re
.
search
(
'SRTM'
,
','
.
join
(
self
.
filt_datasets
)):
data_list
.
append
({
'image_type'
:
'DGM'
,
'satellite'
:
'SRTM'
,
'sensor'
:
'SRTM2'
,
'subsystem'
:
None
,
'acq_datetime'
:
'unknown'
,
'entity_ID'
:
'srtm-1arcsec-version2jan2015-39-42n-70-85'
})
if
re
.
search
(
'ATM'
,
','
.
join
(
self
.
filt_datasets
)):
data_list
.
append
({
'image_type'
:
'ATM'
,
'satellite'
:
'ATM-data'
,
'sensor'
:
'unknown'
,
'subsystem'
:
None
,
'acq_datetime'
:
'unknown'
,
'entity_ID'
:
'dummy_ID'
})
data_list
=
[
dict
(
i
)
for
i
in
data_list
]
for
ds
in
data_list
:
ds
[
'proc_level'
]
=
'L0A'
ds
[
'acq_datetime'
]
=
datetime
.
datetime
.
strptime
(
ds
[
'acq_datetime'
],
'%Y-%m-%d'
)
ds
[
'subsystem'
]
=
''
if
ds
[
'subsystem'
]
is
None
else
ds
[
'subsystem'
]
# ds['scene_ID'] = '_'.join([ds['satellite'],ds['sensor'],ds['subsystem'],ds['entity_ID']])
ds
[
'scene_ID'
]
=
ds
[
'entity_ID'
]
ds
[
'sensormode'
]
=
get_sensormode
(
ds
)
if
self
.
skip_thermal
:
data_list
=
[
ds
for
ds
in
data_list
if
not
ds
[
'subsystem'
]
==
'TIR'
]
# removes ASTER TIR in case of skip_thermal
if
self
.
skip_pan
:
data_list
=
[
ds
for
ds
in
data_list
if
not
ds
[
'sensormode'
]
==
'P'
]
# removes e.g. SPOT PAN in case of skip_pan
self
.
data_list
=
data_list
return
self
.
data_list
def
get_data_list_of_current_jobID
(
self
):
# called in webapp mode
def
get_data_list_of_current_jobID
(
self
):
"""
Get a list of datasets to be processed from database and return it together with some metadata.
...
...
gms_preprocessing/io/input_reader.py
View file @
ba661306
...
...
@@ -161,20 +161,12 @@ def get_list_GMSfiles(dataset_list, target):
:return [/path/to/gms_file1.gms, /path/to/gms_file1.gms]
"""
dataset_list
=
[
dataset_list
]
if
not
isinstance
(
dataset_list
,
list
)
else
dataset_list
if
CFG
.
job
.
call_type
==
'webapp'
:
def
get_gmsP
(
ds
,
tgt
):
return
PG
.
path_generator
(
ds
,
proc_level
=
tgt
).
get_path_gmsfile
()
GMS_list
=
[
p
for
p
in
[
get_gmsP
(
ds
,
target
)
for
ds
in
dataset_list
]
if
os
.
path
.
exists
(
p
)]
else
:
# CFG.job.call_type == 'console'
def
SQLquery
(
ds
):
return
DB_T
.
get_info_from_SQLdb
(
CFG
.
job
.
path_database
,
'processed_data'
,
[
'path_procdata'
,
'baseN'
],
dict
(
image_type
=
ds
[
'image_type'
],
entity_ID
=
ds
[
'entity_ID'
],
subsystem
=
ds
[
'subsystem'
],
proc_level
=
target
))
returned_tuples
=
[
SQLquery
(
ds
)
for
ds
in
dataset_list
]
all_paths
,
all_baseN
=
[
rt
[
0
]
for
rt
in
returned_tuples
],
[
rt
[
1
]
for
rt
in
returned_tuples
]
def
get_gmsP
(
ds
,
tgt
):
return
PG
.
path_generator
(
ds
,
proc_level
=
tgt
).
get_path_gmsfile
()
def
get_gmsP
(
d
r
,
bN
):
return
os
.
path
.
join
(
dr
,
'%s_%s.gms'
%
(
bN
,
target
))
GMS_list
=
[
p
for
p
in
[
get_gmsP
(
d
s
,
target
)
for
ds
in
dataset_list
]
if
os
.
path
.
exists
(
p
)]
GMS_list
=
[
p
for
p
in
[
get_gmsP
(
p
,
bN
)
for
p
,
bN
in
[
all_paths
,
all_baseN
]]
if
os
.
path
.
exists
(
p
)]
return
GMS_list
...
...
gms_preprocessing/misc/database_tools.py
View file @
ba661306
import
collections
import
csv
import
glob
import
itertools
import
os
...
...
@@ -12,7 +11,6 @@ import warnings
from
datetime
import
datetime
from
typing
import
Union
# noqa F401 # flake8 issue
from
six
import
PY3
import
numpy
as
np
import
pandas
as
pd
from
pandas.io.sql
import
pandasSQL_builder
,
SQLTable
...
...
@@ -74,34 +72,6 @@ def get_scene_and_dataset_infos_from_postgreSQLdb(sceneid):
return
ds
def
get_info_from_SQLdb
(
path_db
,
tablename
,
vals2return
,
cond_dict
,
records2fetch
=
0
):
# type: (str,str,list,dict,int) -> Union[list, str]
"""Queries an SQL database for the given parameters.
:param path_db: <str> the physical path of the SQL database on disk
:param tablename: <str> name of the table within the database to be queried
:param vals2return: <list or str> a list of strings containing the column titles of the values to be returned
:param cond_dict: <dict> a dictionary containing the query conditions in the form {'column_name':<value>}
:param records2fetch: <int> number of records to be fetched (default=0: fetch unlimited records)
"""
if
not
isinstance
(
vals2return
,
list
):
vals2return
=
[
vals2return
]
assert
isinstance
(
records2fetch
,
int
),
\
"get_info_from_SQLdb: Expected an integer for the argument 'records2return'. Got %s"
%
type
(
records2fetch
)
if
not
os
.
path
.
isfile
(
path_db
):
return
'database connection fault'
connection
=
sqlite3
.
connect
(
path_db
)
cursor
=
connection
.
cursor
()
condition
=
"WHERE "
+
" AND "
.
join
([
"%s=?"
%
(
list
(
cond_dict
.
keys
())[
i
])
for
i
in
range
(
len
(
cond_dict
))])
cursor
.
execute
(
"SELECT "
+
','
.
join
(
vals2return
)
+
" FROM "
+
tablename
+
" "
+
condition
,
list
(
cond_dict
.
values
()))
records2return
=
cursor
.
fetchall
()
if
records2fetch
==
0
else
[
cursor
.
fetchone
()]
if
records2fetch
==
1
else
\
cursor
.
fetchmany
(
size
=
records2fetch
)
# e.g. [('LE71950282003121EDC00',), ('LE71950282003105ASN00',)]
cursor
.
close
()
connection
.
close
()
return
records2return
def
get_postgreSQL_value
(
value
):
# type: (any) -> str
"""Converts Python variable to a postgreSQL value respecting postgreSQL type casts.
...
...
@@ -1287,7 +1257,7 @@ def import_shapefile_into_postgreSQL_database(path_shp, tablename, cols2import=N
def
data_DB_updater
(
obj_dict
):
# type: (dict) -> None
"""Updates the table "scenes_proc" or "mgrs_tiles_proc within
a
postgreSQL
or an SQL
database
"""Updates the table "scenes_proc" or "mgrs_tiles_proc within
the
postgreSQL database
according to the given dictionary of a GMS object.
:param obj_dict: <dict> a copy of the dictionary of the respective GMS object
...
...
@@ -1297,111 +1267,61 @@ def data_DB_updater(obj_dict):
def
list2str
(
list2convert
):
return
''
.
join
([
str
(
val
)
for
val
in
list2convert
])
if
CFG
.
job
.
call_type
==
'console'
:
if
not
os
.
path
.
isfile
(
CFG
.
job
.
path_database
):
print
(
'No internal database found. Creating a new one...'
)
connection
=
sqlite3
.
connect
(
CFG
.
job
.
path_database
)
connection
=
psycopg2
.
connect
(
CFG
.
job
.
conn_database
)
if
connection
is
None
:
print
(
'Database connection could not be established. Database entry could not be created or updated.'
)
else
:
if
obj_dict
[
'arr_shape'
]
!=
'MGRS_tile'
:
table2update
=
'scenes_proc'
dict_dbkey_objkey
=
{
'sceneid'
:
obj_dict
[
'scene_ID'
],
'georef'
:
True
if
obj_dict
[
'georef'
]
else
False
,
'proc_level'
:
obj_dict
[
'proc_level'
],
'layer_bands_assignment'
:
''
.
join
(
obj_dict
[
'LayerBandsAssignment'
]),
'bounds'
:
Polygon
(
obj_dict
[
'trueDataCornerLonLat'
])}
matchExp
=
'WHERE '
+
get_postgreSQL_matchingExp
(
'sceneid'
,
dict_dbkey_objkey
[
'sceneid'
])
keys2update
=
[
'georef'
,
'proc_level'
,
'layer_bands_assignment'
,
'bounds'
]
else
:
# MGRS_tile
table2update
=
'mgrs_tiles_proc'
def
get_tile_bounds_box
(
bnds
):
return
box
(
bnds
[
0
],
bnds
[
2
],
bnds
[
1
],
bnds
[
3
])
dict_dbkey_objkey
=
{
'sceneid'
:
obj_dict
[
'scene_ID'
],
'scenes_proc_id'
:
obj_dict
[
'scenes_proc_ID'
],
'mgrs_code'
:
obj_dict
[
'MGRS_info'
][
'tile_ID'
],
'virtual_sensor_id'
:
CFG
.
usecase
.
virtual_sensor_id
,
'proc_level'
:
obj_dict
[
'proc_level'
],
'coreg_success'
:
obj_dict
[
'coreg_info'
][
'success'
],
'tile_bounds'
:
get_tile_bounds_box
(
obj_dict
[
'bounds_LonLat'
]),
'data_corners'
:
Polygon
(
obj_dict
[
'trueDataCornerLonLat'
])}
matchExp
=
'WHERE '
+
' AND '
.
join
([
get_postgreSQL_matchingExp
(
k
,
dict_dbkey_objkey
[
k
])
for
k
in
[
'sceneid'
,
'mgrs_code'
,
'virtual_sensor_id'
]])
keys2update
=
[
'scenes_proc_id'
,
'proc_level'
,
'coreg_success'
,
'tile_bounds'
,
'data_corners'
]
if
obj_dict
[
'scenes_proc_ID'
]
is
None
:
keys2update
.
remove
(
'scenes_proc_id'
)
cursor
=
connection
.
cursor
()
fullColumnList
=
[
'job_ID'
,
'job_CPUs'
,
'image_type'
,
'satellite'
,
'sensor'
,
'subsystem'
,
'sensormode'
,
'acquisition_date'
,
'entity_ID'
,
'georef'
,
'proc_level'
,
'LayerBandsAssignment'
,
'path_procdata'
]
cursor
.
execute
(
'''CREATE TABLE IF NOT EXISTS processed_data (%s)'''
%
', '
.
join
(
fullColumnList
))
currentColumnList
=
[
i
[
1
]
for
i
in
cursor
.
execute
(
"PRAGMA table_info('processed_data')"
).
fetchall
()]
missingColumns
=
[
col
for
col
in
fullColumnList
if
col
not
in
currentColumnList
]
if
missingColumns
:
# automatic adding of missing columns
cursor
.
execute
(
'''CREATE TABLE IF NOT EXISTS processed_data_temp (%s)'''
%
', '
.
join
(
fullColumnList
))
cursor
.
execute
(
"SELECT "
+
','
.
join
(
currentColumnList
)
+
" FROM processed_data"
)
[
cursor
.
execute
(
"INSERT INTO processed_data_temp (%(cols)s) VALUES (%(vals)s)"
%
{
'cols'
:
','
.
join
(
currentColumnList
),
'vals'
:
','
.
join
([
'?'
]
*
len
(
currentColumnList
))},
row
)
for
row
in
cursor
.
fetchall
()]
cursor
.
execute
(
"DROP TABLE processed_data"
)
cursor
.
execute
(
"ALTER TABLE processed_data_temp RENAME TO processed_data"
)
cursor
.
execute
(
"SELECT EXISTS(SELECT 1 FROM processed_data WHERE entity_ID=? AND sensor=? AND subsystem=?)"
,
[
obj_dict
[
'entity_ID'
],
obj_dict
[
'sensor'
],
obj_dict
[
'subsystem'
]])
if
cursor
.
fetchone
()[
0
]
==
0
:
# create new entry
new_record
=
[
obj_dict
[
key
]
for
key
in
fullColumnList
]
new_record
=
[(
''
.
join
([
str
(
val
[
li
])
for
li
in
range
(
len
(
val
))]))
if
isinstance
(
val
,
list
)
else
val
for
val
in
new_record
]
# e.g. converts list of LayerBandsAssignment to string
cursor
.
execute
(
"INSERT INTO processed_data VALUES (%s)"
%
','
.
join
([
'?'
]
*
len
(
new_record
)),
new_record
)
else
:
# udate existing entry
values2update
=
[
obj_dict
[
key
]
for
key
in
[
'job_ID'
,
'job_CPUs'
,
'proc_level'
,
'path_procdata'
,
'LayerBandsAssignment'
]]
values2update
=
[(
''
.
join
([
str
(
val
[
li
])
for
li
in
range
(
len
(
val
))]))
if
isinstance
(
val
,
list
)
else
val
for
val
in
values2update
]
# e.g. converts list of LayerBandsAssignment to string
connection
.
execute
(
"UPDATE processed_data set job_ID=?, job_CPUs=?, proc_level=?,path_procdata=?,
\
LayerBandsAssignment=? WHERE entity_ID=? AND sensor=? AND subsystem=?"
,
values2update
+
[
obj_dict
[
'entity_ID'
]]
+
[
obj_dict
[
'sensor'
],
obj_dict
[
'subsystem'
]])
else
:
# call_type == 'webapp'
connection
=
psycopg2
.
connect
(
CFG
.
job
.
conn_database
)
if
connection
is
None
:
print
(
'Database connection could not be established. Database entry could not be created or updated.'
)
# check if record exists
execute_pgSQL_query
(
cursor
,
"SELECT EXISTS(SELECT 1 FROM %s %s)"
%
(
table2update
,
matchExp
))
# create new entry
if
cursor
.
fetchone
()[
0
]
==
0
:
keys
,
vals
=
zip
(
*
[(
k
,
str
(
get_postgreSQL_value
(
v
)))
for
k
,
v
in
dict_dbkey_objkey
.
items
()])
execute_pgSQL_query
(
cursor
,
"INSERT INTO %s (%s) VALUES (%s);"
%
(
table2update
,
','
.
join
(
keys
),
','
.
join
(
vals
)))
# or update existing entry
else
:
if
obj_dict
[
'arr_shape'
]
!=
'MGRS_tile'
:
table2update
=
'scenes_proc'
dict_dbkey_objkey
=
{
'sceneid'
:
obj_dict
[
'scene_ID'
],
'georef'
:
True
if
obj_dict
[
'georef'
]
else
False
,
'proc_level'
:
obj_dict
[
'proc_level'
],
'layer_bands_assignment'
:
''
.
join
(
obj_dict
[
'LayerBandsAssignment'
]),
'bounds'
:
Polygon
(
obj_dict
[
'trueDataCornerLonLat'
])}
matchExp
=
'WHERE '
+
get_postgreSQL_matchingExp
(
'sceneid'
,
dict_dbkey_objkey
[
'sceneid'
])
keys2update
=
[
'georef'
,
'proc_level'
,
'layer_bands_assignment'
,
'bounds'
]
else
:
# MGRS_tile
table2update
=
'mgrs_tiles_proc'
def
get_tile_bounds_box
(
bnds
):
return
box
(
bnds
[
0
],
bnds
[
2
],
bnds
[
1
],
bnds
[
3
])
dict_dbkey_objkey
=
{
'sceneid'
:
obj_dict
[
'scene_ID'
],
'scenes_proc_id'
:
obj_dict
[
'scenes_proc_ID'
],
'mgrs_code'
:
obj_dict
[
'MGRS_info'
][
'tile_ID'
],
'virtual_sensor_id'
:
CFG
.
usecase
.
virtual_sensor_id
,
'proc_level'
:
obj_dict
[
'proc_level'
],
'coreg_success'
:
obj_dict
[
'coreg_info'
][
'success'
],
'tile_bounds'
:
get_tile_bounds_box
(
obj_dict
[
'bounds_LonLat'
]),
'data_corners'
:
Polygon
(
obj_dict
[
'trueDataCornerLonLat'
])}
matchExp
=
'WHERE '
+
' AND '
.
join
([
get_postgreSQL_matchingExp
(
k
,
dict_dbkey_objkey
[
k
])
for
k
in
[
'sceneid'
,
'mgrs_code'
,
'virtual_sensor_id'
]])
keys2update
=
[
'scenes_proc_id'
,
'proc_level'
,
'coreg_success'
,
'tile_bounds'
,
'data_corners'
]
if
obj_dict
[
'scenes_proc_ID'
]
is
None
:
keys2update
.
remove
(
'scenes_proc_id'
)
cursor
=
connection
.
cursor
()
# check if record exists
execute_pgSQL_query
(
cursor
,
"SELECT EXISTS(SELECT 1 FROM %s %s)"
%
(
table2update
,
matchExp
))
# create new entry
if
cursor
.
fetchone
()[
0
]
==
0
:
keys
,
vals
=
zip
(
*
[(
k
,
str
(
get_postgreSQL_value
(
v
)))
for
k
,
v
in
dict_dbkey_objkey
.
items
()])
execute_pgSQL_query
(
cursor
,
"INSERT INTO %s (%s) VALUES (%s);"
%
(
table2update
,
','
.
join
(
keys
),
','
.
join
(
vals
)))
# or update existing entry
else
:
setExp
=
'SET '
+
','
.
join
(
[
'%s=%s'
%
(
k
,
get_postgreSQL_value
(
dict_dbkey_objkey
[
k
]))
for
k
in
keys2update
])
execute_pgSQL_query
(
cursor
,
"UPDATE %s %s %s;"
%
(
table2update
,
setExp
,
matchExp
))
setExp
=
'SET '
+
','
.
join
(
[
'%s=%s'
%
(
k
,
get_postgreSQL_value
(
dict_dbkey_objkey
[
k
]))
for
k
in
keys2update
])
execute_pgSQL_query
(
cursor
,
"UPDATE %s %s %s;"
%
(
table2update
,
setExp
,
matchExp
))
if
'connection'
in
locals
():
connection
.
commit
()
connection
.
close
()
def
SQL_DB_to_csv
():
if
not
os
.
path
.
exists
(
CFG
.
job
.
path_database
)
or
not
os
.
path
.
getsize
(
CFG
.
job
.
path_database
)
>
0
:
print
(
'No database conversion to CSV performed, because DB does not exist or DB is empty.'
)
else
:
connection
=
sqlite3
.
connect
(
CFG
.
job
.
path_database
)
cursor
=
connection
.
cursor
()
cursor
.
execute
(
"SELECT * FROM processed_data"
)
with
open
(
os
.
path
.
join
(
os
.
path
.
dirname
(
CFG
.
job
.
path_database
),
'data_DB.csv'
),
'w'
if
PY3
else
'wb'
)
as
csvfile
:
csvwriter
=
csv
.
writer
(
csvfile
)
csvwriter
.
writerow
([
i
[
0
]
for
i
in
cursor
.
description
])
csvwriter
.
writerows
(
cursor
)
def
postgreSQL_table_to_csv
(
conn_db
,
path_csv
,
tablename
):
# GeoDataFrame.to_csv(path_csv, index_label='id')
raise
NotImplementedError
# TODO
...
...
gms_preprocessing/model/metadata.py
View file @
ba661306
...
...
@@ -558,18 +558,7 @@ class METADATA(object):
if
self
.
EntityID
==
''
:
self
.
logger
.
info
(
'Scene-ID could not be extracted and has to be retrieved from %s metadata database...'
%
self
.
Satellite
)
if
CFG
.
job
.
call_type
==
'console'
:
DB_T
.
update_metaDB_if_needed
(
self
.
Satellite
,
self
.
Sensor
,
self
.
Subsystem
,
self
.
AcqDate
)
tablename
=
'%s_%s_%s'
%
(
self
.
Satellite
.
replace
(
'-'
,
''
),
self
.
Sensor
.
replace
(
'+'
,
''
),
self
.
Subsystem
)
if
self
.
Subsystem
!=
''
else
\
'%s_%s'
%
(
self
.
Satellite
.
replace
(
'-'
,
''
),
self
.
Sensor
.
replace
(
'+'
,
''
))
tablename
=
tablename
if
tablename
not
in
[
'Landsat4_TM'
,
'Landsat5_TM'
]
else
'Landsat45_TM'
result
=
DB_T
.
get_info_from_SQLdb
(
CFG
.
job
.
path_db_meta
,
tablename
,
[
'sceneID'
,
'sensor'
],
{
'acquisitionDate'
:
self
.
AcqDate
,
'path'
:
self
.
WRS_path
,
'row'
:
self
.
WRS_row
},
records2fetch
=
1
)
else
:
result
=
DB_T
.
get_info_from_postgreSQLdb
(
CFG
.
job
.
conn_database
,
'scenes'
,
[
'entityID'
],
{
'id'
:
self
.
SceneID
})
result
=
DB_T
.
get_info_from_postgreSQLdb
(
CFG
.
job
.
conn_database
,
'scenes'
,
[
'entityID'
],
{
'id'
:
self
.
SceneID
})
if
len
(
result
)
==
1
:
# e.g. [('LE71950282003121EDC00',)]
self
.
EntityID
=
result
[
0
][
0
]
...
...
gms_preprocessing/processing/process_controller.py
View file @
ba661306
...
...
@@ -38,14 +38,13 @@ __author__ = 'Daniel Scheffler'
class
process_controller
(
object
):
def
__init__
(
self
,
job_ID
,
call_type
=
'webapp'
,
exec_mode
=
'Python'
,
db_host
=
'localhost'
,
def
__init__
(
self
,
job_ID
,
exec_mode
=
'Python'
,
db_host
=
'localhost'
,
parallelization_level
=
'scenes'
,
delete_old_output
=
False
,
job_config_kwargs
=
None
):
# type: (
int,
str, str, str, str, bool) -> None
# type: (str, str, str, str, bool) -> None
"""gms_preprocessing process controller
:param job_ID: <int> a job ID belonging to a valid database record within table 'jobs'
:param call_type: <str> choices: 'webapp' and 'console'
:param exec_mode: <str> choices: 'Python' - writes all intermediate data to disk
'Flink' - keeps all intermediate data in memory
:param db_host: <str> hostname of the host where database is hosted
...
...
@@ -58,14 +57,11 @@ class process_controller(object):
# assertions
if
not
isinstance
(
job_ID
,
int
):
raise
ValueError
(
"'job_ID' must be an integer value. Got %s."
%
type
(
job_ID
))
if
call_type
not
in
[
'webapp'
,
'console'
]:
raise
ValueError
(
"Unexpected call_type '%s'!"
%
call_type
)
if
exec_mode
not
in
[
'Python'
,
'Flink'
]:
raise
ValueError
(
"Unexpected exec_mode '%s'!"
%
exec_mode
)
if
parallelization_level
not
in
[
'scenes'
,
'tiles'
]:
raise
ValueError