Commit d3978b20 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Merge branch 'enhancement/provide_more_options'


Former-commit-id: e0311927
Former-commit-id: 0a8f3730
parents 1a98521d 5dc46abd
Pipeline #2498 canceled with stage
......@@ -61,7 +61,7 @@ test-all: ## run tests on every Python version with tox
tox
coverage: clean-test ## check code coverage quickly with the default Python
coverage run --source gms_preprocessing setup.py test
coverage run --source gms_preprocessing --source bin setup.py test
coverage combine # must be called in order to make coverage work in multiprocessing
coverage report -m
coverage html
......@@ -71,8 +71,8 @@ nosetests: clean-test ## Runs nosetests with coverage, xUnit and nose-html-outpu
## - puts the coverage results in the folder 'htmlcov'
## - generates 'nosetests.html' (--with-html)
## - generates 'nosetests.xml' (--with-xunit) which is currently not visualizable by GitLab
nosetests -vv --with-coverage --cover-package=gms_preprocessing --cover-erase --cover-html --cover-html-dir=htmlcov \
--with-html --with-xunit --rednose --force-color
nosetests -vv --with-coverage --cover-package=gms_preprocessing --cover-package=bin --cover-erase --cover-html
--cover-html-dir=htmlcov --with-html --with-xunit --rednose --force-color
docs: ## generate Sphinx HTML documentation, including API docs
rm -f docs/gms_preprocessing.rst
......
......@@ -3,6 +3,7 @@ __author__ = 'Daniel Scheffler'
import argparse
import warnings
import os
import matplotlib
......@@ -10,6 +11,17 @@ matplotlib.use('Agg', warn=False) # switch matplotlib backend to 'Agg' and disa
from gms_preprocessing import process_controller, __version__ # noqa: E402
from gms_preprocessing.misc.database_tools import GMS_JOB # noqa: E402
from gms_preprocessing.options.config import get_conn_database # noqa: E402
from gms_preprocessing.options.config import path_options_default # noqa: E402
from gms_preprocessing.options.config import get_options # noqa: E402
from gms_preprocessing.options.config import get_config_kwargs_default # noqa: E402
options_default = get_options(path_options_default, validation=True) # type: dict
config_kwargs_default = get_config_kwargs_default() # type: dict
def get_config_kwargs_from_cli_args(cli_args):
return {k: v for k, v in cli_args.__dict__.items() if k in config_kwargs_default.keys()}
def run_from_jobid(args):
......@@ -19,12 +31,15 @@ def run_from_jobid(args):
# TODO download: run only the downloader
# set up process controller instance
PC = process_controller(args.jobid, parallelization_level='scenes', db_host='geoms') # FIXME hardcoded host
# PC.path_procdata_scenes = '/geoms/data/processed_scenes_dev'
# PC.path_procdata_MGRS = '/geoms/data/processed_mgrs_tiles_dev'
PC = process_controller(args.jobid, **get_config_kwargs_from_cli_args(args))
# run the job
PC.run_all_processors()
if 'GMS_IS_TEST' in os.environ and os.environ['GMS_IS_TEST'] == 'True':
# in case of software test, it is enough to get an instance of process controller because all inputs are
# validated within options.config.Job_Config (indirectly called by ProcessController.__init__() )
pass
else:
PC.run_all_processors()
def run_from_sceneids(args):
......@@ -32,19 +47,12 @@ def run_from_sceneids(args):
warnings.warn('Currently the console argument parser expects the given scenes as already downloaded.') # TODO
# create a new processing job from scene IDs
db_connection = "dbname='geomultisens' user='gmsdb' password='gmsdb' host='localhost' connect_timeout=3" # TODO
warnings.warn('Currently the console argument parser expects the database at localhost.') # TODO
virtual_sensor_id = 1 # TODO
warnings.warn('Currently the console argument parser sets the virtual sensor ID to 1.') # TODO
datasetid_spatial_ref = 249 # TODO
warnings.warn('Currently the console argument parser sets the dataset ID of the spatial reference to 249.') # TODO
dbJob = GMS_JOB(db_connection)
dbJob = GMS_JOB(get_conn_database(args.db_host))
dbJob.from_sceneIDlist(list_sceneIDs=args.sceneids,
virtual_sensor_id=virtual_sensor_id,
datasetid_spatial_ref=datasetid_spatial_ref,
comment='')
_run_job(dbJob)
virtual_sensor_id=args.virtual_sensor_id,
datasetid_spatial_ref=args.datasetid_spatial_ref,
comment=args.comment)
_run_job(dbJob, **get_config_kwargs_from_cli_args(args))
def run_from_entityids(args):
......@@ -53,20 +61,12 @@ def run_from_entityids(args):
:param args:
:return:
"""
db_connection = "dbname='geomultisens' user='gmsdb' password='gmsdb' host='localhost' connect_timeout=3" # TODO
warnings.warn('Currently the console argument parser expects the database at localhost.') # TODO
virtual_sensor_id = 1 # TODO
warnings.warn('Currently the console argument parser sets the virtual sensor ID to 1.') # TODO
datasetid_spatial_ref = 249 # TODO
warnings.warn('Currently the console argument parser sets the dataset ID of the spatial reference to 249.') # TODO
dbJob = GMS_JOB(db_connection)
dbJob = GMS_JOB(get_conn_database(args.db_host))
dbJob.from_entityIDlist(list_entityids=args.entityids,
virtual_sensor_id=virtual_sensor_id,
datasetid_spatial_ref=datasetid_spatial_ref,
comment='')
_run_job(dbJob)
virtual_sensor_id=args.virtual_sensor_id,
datasetid_spatial_ref=args.datasetid_spatial_ref,
comment=args.comment)
_run_job(dbJob, **get_config_kwargs_from_cli_args(args))
def run_from_filenames(args):
......@@ -75,20 +75,12 @@ def run_from_filenames(args):
:param args:
:return:
"""
db_connection = "dbname='geomultisens' user='gmsdb' password='gmsdb' host='localhost' connect_timeout=3" # TODO
warnings.warn('Currently the console argument parser expects the database at localhost.') # TODO
virtual_sensor_id = 1 # TODO
warnings.warn('Currently the console argument parser sets the virtual sensor ID to 1.') # TODO
datasetid_spatial_ref = 249 # TODO
warnings.warn('Currently the console argument parser sets the dataset ID of the spatial reference to 249.') # TODO
dbJob = GMS_JOB(db_connection)
dbJob = GMS_JOB(get_conn_database(args.db_host))
dbJob.from_filenames(list_filenames=args.filenames,
virtual_sensor_id=virtual_sensor_id,
datasetid_spatial_ref=datasetid_spatial_ref,
comment='')
_run_job(dbJob)
virtual_sensor_id=args.virtual_sensor_id,
datasetid_spatial_ref=args.datasetid_spatial_ref,
comment=args.comment)
_run_job(dbJob, **get_config_kwargs_from_cli_args(args))
def run_from_constraints(args):
......@@ -97,32 +89,35 @@ def run_from_constraints(args):
raise NotImplementedError
def _run_job(dbJob, parallelization_level='scenes'):
def _run_job(dbJob, **config_kwargs):
# type: (GMS_JOB) -> None
"""
:param dbJob:
:return:
"""
# create a database record for the given job
dbJob.create()
jobid = dbJob.id
# set up process controller instance
warnings.warn("Currently the console argument parser sets the parallelization level to 'scenes'.") # TODO
PC = process_controller(jobid, parallelization_level=parallelization_level)
# PC.path_procdata_scenes = '/geoms/data/processed_scenes_dev'
# PC.path_procdata_MGRS = '/geoms/data/processed_mgrs_tiles_dev'
PC = process_controller(dbJob.id, **config_kwargs)
# run the job
PC.run_all_processors()
if 'GMS_IS_TEST' in os.environ and os.environ['GMS_IS_TEST'] == 'True':
# in case of software test, it is enough to get an instance of process controller because all inputs are
# validated within options.config.Job_Config (indirectly called by ProcessController.__init__() )
pass
else:
PC.run_all_processors()
def get_gms_argparser():
"""Return argument parser for run_gms.py program."""
# CONFIGURE MAIN PARSER FOR THE GEOMULTISENS PREPROCESSING CHAIN
##################################################################
# CONFIGURE MAIN PARSER FOR THE GEOMULTISENS PREPROCESSING CHAIN #
##################################################################
parser = argparse.ArgumentParser(
prog='run_gms.py',
description='=' * 70 + '\n' + 'GeoMultiSens preprocessing console argument parser. '
......@@ -133,40 +128,86 @@ def get_gms_argparser():
parser.add_argument('--version', action='version', version=__version__)
subparsers = parser.add_subparsers()
#################################################################
# CONFIGURE SUBPARSERS FOR THE GEOMULTISENS PREPROCESSING CHAIN #
#################################################################
##############################################
# define parsers containing common arguments #
##############################################
general_opts_parser = argparse.ArgumentParser(add_help=False)
gop_p = general_opts_parser.add_argument
gop_p('-jc', '--json_config', nargs='?', type=str,
help='file path of a JSON file containing options. See here for an example:'
'https://gitext.gfz-potsdam.de/geomultisens/gms_preprocessing/'
'blob/master/gms_preprocessing/options/options_default.json')
# CONFIGURE SUBPARSERS FOR THE GEOMULTISENS PREPROCESSING CHAIN
parser_jobid = subparsers\
.add_parser('jobid', description='Run a GeoMultiSens preprocessing job using an already existing job ID.',
help="Run a GeoMultiSens preprocessing job using an already existing job ID (Sub-Parser).")
# '-exec_L1AP': dict(nargs=3, type=bool, help="L1A Processor configuration",
# metavar=tuple("[run processor, write output, delete output]".split(' ')), default=[1, 1, 1]),
parser_sceneids = subparsers\
.add_parser('sceneids', description='Run a GeoMultiSens preprocessing job for a given list of scene IDs.',
help="Run a GeoMultiSens preprocessing job for a given list of scene IDs (Sub-Parser).")
gop_p('-DH', '--db_host', nargs='?', type=str,
default=options_default["global_opts"]["db_host"],
help='host name of the server that runs the postgreSQL database')
parser_entityids = subparsers\
.add_parser('entityids', description='Run a GeoMultiSens preprocessing job for a given list of entity IDs.',
help="Run a GeoMultiSens preprocessing job for a given list of entity IDs (Sub-Parser).")
gop_p('-vid', '--virtual_sensor_id', type=int,
default=options_default["usecase"]["virtual_sensor_id"],
help='ID of the target (virtual) sensor')
parser_filenames = subparsers\
.add_parser('filenames', description='Run a GeoMultiSens preprocessing job for a given list of filenames of '
'downloaded satellite image archives!',
help="Run a GeoMultiSens preprocessing job for a given list of filenames of downloaded satellite "
"image archives! (Sub-Parser).")
gop_p('-dsid_spat', '--datasetid_spatial_ref', type=int,
default=options_default["usecase"]["datasetid_spatial_ref"],
help='dataset ID of the spatial reference')
parser_constraints = subparsers\
.add_parser('constraints', description='Run a GeoMultiSens preprocessing job matching the given constraints.',
help="Run a GeoMultiSens preprocessing job matching the given constraints (Sub-Parser).")
gop_p('-c', '--comment', nargs='?', type=str,
default='',
help='comment concerning the job')
# parse_from_sceneids = subparsers.add_parser('from_sceneids',
# description='Run a GeoMultiSens preprocessing job for a given list of scene IDs.',
# help="use '>>> python /path/to/GeMultiSens/run_gms.py from_sceneids -h' for documentation and usage hints")
##################
# add subparsers #
##################
subparsers = parser.add_subparsers()
parser_jobid = subparsers.add_parser(
'jobid', parents=[general_opts_parser],
description='Run a GeoMultiSens preprocessing job using an already existing job ID.',
help="Run a GeoMultiSens preprocessing job using an already existing job ID (Sub-Parser).")
parser_sceneids = subparsers.add_parser(
'sceneids', parents=[general_opts_parser],
description='Run a GeoMultiSens preprocessing job for a given list of scene IDs.',
help="Run a GeoMultiSens preprocessing job for a given list of scene IDs (Sub-Parser).")
parser_entityids = subparsers.add_parser(
'entityids', parents=[general_opts_parser],
description='Run a GeoMultiSens preprocessing job for a given list of entity IDs.',
help="Run a GeoMultiSens preprocessing job for a given list of entity IDs (Sub-Parser).")
parser_filenames = subparsers.add_parser(
'filenames', parents=[general_opts_parser],
description='Run a GeoMultiSens preprocessing job for a given list of filenames of '
'downloaded satellite image archives!',
help="Run a GeoMultiSens preprocessing job for a given list of filenames of downloaded satellite "
"image archives! (Sub-Parser).")
parser_constraints = subparsers.add_parser(
'constraints', parents=[general_opts_parser],
description='Run a GeoMultiSens preprocessing job matching the given constraints.',
help="Run a GeoMultiSens preprocessing job matching the given constraints (Sub-Parser).")
#################
# ADD ARGUMENTS #
#################
##########################
# add indivial arguments #
##########################
# ADD ARGUMENTS
# add arguments to parser_jobid
jid_p = parser_jobid.add_argument
jid_p('jobid', type=int, help='job ID of an already created GeoMultiSens preprocessing job (must be present in the '
'jobs table of the database)')
jid_p('jobid', type=int, help='job ID of an already created GeoMultiSens preprocessing job '
'(must be present in the jobs table of the database)')
# add arguments to parser_sceneids
sid_p = parser_sceneids.add_argument
......@@ -191,26 +232,10 @@ def get_gms_argparser():
# con_p('constraints', nargs='+', type=str, help="list of entity IDs corresponding to valid records within the "
# "'scenes' table of the database")
# add general arguments # TODO add these configs to each subparser
general_opts = {
'-db_host': dict(),
'-exec_mode': dict(nargs=3, type=bool, help="L1A Processor configuration",
metavar=tuple("[run processor, write output, delete output]".split(' ')), default=[1, 1, 1]),
'-exec_L1AP': dict(),
'-exec_L1BP': dict(),
'-exec_L1CP': dict(),
'-exec_L2AP': dict(),
'-exec_L2BP': dict(),
'-exec_L2CP': dict(),
'-sub_multiProc': dict(),
'-exc_handler': dict(),
'-blocksize': dict(),
'-profiling': dict(),
'-bench_all': dict(),
}
# LINK PARSERS TO RUN FUNCTIONS
#################################
# LINK PARSERS TO RUN FUNCTIONS #
#################################
parser_jobid.set_defaults(func=run_from_jobid)
parser_sceneids.set_defaults(func=run_from_sceneids)
parser_entityids.set_defaults(func=run_from_entityids)
......
......@@ -342,6 +342,35 @@ def create_record_in_postgreSQLdb(conn_params, tablename, vals2write_dict, timeo
return newID
def delete_record_in_postgreSQLdb(conn_params, tablename, record_id, timeout=15000):
# type: (str, str, dict, dict, int) -> Union[int, str]
"""Delete a single record in a postgreSQL database.
:param conn_params: <str> connection parameters as provided by CFG.conn_params
:param tablename: <str> name of the table within the database to be updated
:param record_id: <dict> ID of the record to be deleted
:param timeout: <int> allows to set a custom statement timeout (milliseconds)
"""
conn_params = "%s options = '-c statement_timeout=%s'" % (conn_params, timeout)
connection = psycopg2.connect(conn_params)
if connection is None:
warnings.warn('database connection fault')
return 'database connection fault'
cursor = connection.cursor()
execute_pgSQL_query(cursor, "DELETE FROM %s WHERE id=%s;" % (tablename, record_id))
execute_pgSQL_query(cursor, "SELECT id FROM %s WHERE id=%s" % (tablename, record_id))
res = cursor.fetchone()
if 'connection' in locals():
connection.commit()
connection.close()
return 'success' if res is None else 'fail'
def get_pgSQL_geospatial_query_cond(conn_params, table2query, geomCol2use='bounds', tgt_corners_lonlat=None,
scene_ID=None, queryfunc='ST_Intersects', crossing_dateline_check=True):
assert tgt_corners_lonlat if scene_ID is None else scene_ID, "Provide eihter scene_ID or tgt_corners_lonlat!"
......@@ -632,6 +661,11 @@ class GMS_JOB(object):
self.failed_sceneids = []
self.ref_job_id = None
self.datacube_mgrs_tiles_proc = []
self.non_ref_datasetids = []
self.max_cloudcover = None
self.season_code = None # type: int
self.path_analysis_script = '' # TODO
self.job_mode = 'processing_only' # FIXME download/processing/...
self.jobs_table_columns = ['id', 'creationtime', 'finishtime', 'sceneids', 'timerange_start',
'timerange_end', 'bounds', 'distribution_index', 'progress', 'feedback',
'failed_sceneids', 'datasetid_spatial_ref',
......@@ -663,6 +697,10 @@ class GMS_JOB(object):
def _set_target_sensor_specs(self, virtual_sensor_id, datasetid_spatial_ref):
self.virtualsensorid = virtual_sensor_id
if not isinstance(datasetid_spatial_ref, int):
raise ValueError(datasetid_spatial_ref)
res = get_info_from_postgreSQLdb(self.conn, 'virtual_sensors', ['spatial_resolution',
"projection_epsg"], {'id': virtual_sensor_id})
assert res, \
......@@ -671,7 +709,6 @@ class GMS_JOB(object):
self.ground_spatial_sampling = [target_gsd, target_gsd] if type(target_gsd) in [int, float] else target_gsd
self.epsg = int(res[0][1])
assert isinstance(datasetid_spatial_ref, int)
self.datasetid_spatial_ref = datasetid_spatial_ref
res = get_info_from_postgreSQLdb(self.conn, 'datasets', ['name'], {'id': datasetid_spatial_ref})
assert res, \
......
......@@ -52,7 +52,8 @@ def set_config(job_ID, json_config='', exec_mode='Python', parallelization_level
exec_L1BP=None, exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None,
allow_subMultiprocessing=True, disable_exception_handler=True, log_level='INFO',
tiling_block_size_XY=(2048, 2048), is_test=False, profiling=False, benchmark_global=False,
path_procdata_scenes=None, path_procdata_MGRS=None, path_archive=None):
path_procdata_scenes=None, path_procdata_MGRS=None, path_archive=None, virtual_sensor_id=10,
datasetid_spatial_ref=249):
"""Set up a configuration for a new gms_preprocessing job!
:param job_ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
......@@ -90,7 +91,10 @@ def set_config(job_ID, json_config='', exec_mode='Python', parallelization_level
:param path_procdata_MGRS:
output path to store processed MGRS tiles
:param path_archive: input path where downloaded data are stored
:param virtual_sensor_id: 1: Landsat-8, 10: Sentinel-2A 10m
:param datasetid_spatial_ref: 249 Sentinel-2A
"""
# FIXME virtual_sensor_id and datasetid_spatial_ref are not respected by JobConfig.
if not hasattr(builtins, 'GMS_JobConfig') or reset_status:
kwargs = dict([x for x in locals().items() if x[0] != "self" and not x[0].startswith('__')])
builtins.GMS_JobConfig = JobConfig(job_ID, **kwargs)
......@@ -98,6 +102,23 @@ def set_config(job_ID, json_config='', exec_mode='Python', parallelization_level
return getattr(builtins, 'GMS_JobConfig')
def get_conn_database(hostname='localhost', timeout=3):
# type: (str, int) -> str
"""Return database connection string.
:param hostname: the host that runs the GMS postgreSQL database
:param timeout: connection timeout in seconds
:return:
"""
return "dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=%d" \
% (hostname, timeout)
def get_config_kwargs_default():
a = getfullargspec(set_config)
return dict(zip(a.args[-len(a.defaults):], a.defaults))
class JobConfig(object):
def __init__(self, ID, **user_opts):
"""Create a job configuration
......@@ -136,8 +157,7 @@ class JobConfig(object):
# database connection
self.db_host = user_opts['db_host']
self.conn_database = "dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=3" \
% self.db_host
self.conn_database = get_conn_database(hostname=self.db_host)
# get validated options dict from JSON-options
json_opts = self.get_json_opts(validate=True)
......@@ -377,8 +397,7 @@ class JobConfig(object):
@property
def kwargs_defaults(self):
if not self._kwargs_defaults:
a = getfullargspec(set_config)
self._kwargs_defaults = dict(zip(a.args[-len(a.defaults):], a.defaults))
self._kwargs_defaults = get_config_kwargs_default()
return self._kwargs_defaults
......
......@@ -125,7 +125,7 @@
"usecase": { /*NOTE: These options will be not respected in the WebApp! Use the WebApp GUI instead.*/
"virtual_sensor_id": 10, /*"None": use WebApp input; 1: Landsat-8, 10: Sentinel-2A 10m*/
"datasetid_spatial_ref": "None", /*"None": use WebApp input*/
"datasetid_spatial_ref": 249, /*"None": use WebApp input*/
"datasetid_spectral_ref": 249, /*249=Sentinel-2A*/
"target_CWL": [
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
test_cli
--------
Tests for gms_preprocessing.bin.run_gms
"""
import unittest
import os
from runpy import run_path
import warnings
from gms_preprocessing import __path__
from gms_preprocessing.misc.database_tools import delete_record_in_postgreSQLdb
path_run_gms = os.path.abspath(os.path.join(__path__[0], '..', 'bin', 'run_gms.py'))
class Base_CLITester:
class Base_CLITestCase(unittest.TestCase):
baseargs = []
def setUp(self):
self.parser_run = run_path(path_run_gms)['get_gms_argparser']()
os.environ['GMS_IS_TEST'] = 'True'
def tearDown(self):
del os.environ['GMS_IS_TEST']
# delete the created test job in case the created subparser creates a new job
if self.baseargs[0] in ['sceneids', 'entityids', 'filenames']:
res = delete_record_in_postgreSQLdb(self.current_CFG.conn_database,
tablename='jobs', record_id=self.current_CFG.ID)
if res != 'success':
warnings.warn('Test job record could not be deleted from jobs table of postgreSQL database.')
@property
def current_CFG(self):
from gms_preprocessing.options.config import GMS_config
return GMS_config
####################################
# test that run for each subparser #
####################################
def test_hostname_custom(self):
parsed_args = self.parser_run.parse_args(self.baseargs +
['--db_host', 'geoms'])
parsed_args.func(parsed_args)
self.assertEqual(self.current_CFG.db_host, 'geoms')
class Test_run_jobid(Base_CLITester.Base_CLITestCase):
def setUp(self):
super().setUp()
self.baseargs = ['jobid', str(26186261)] # Landsat8_CollectionData
class Test_run_sceneids(Base_CLITester.Base_CLITestCase):
def setUp(self):
super().setUp()
self.baseargs = ['sceneids', str(32259730)] # LC81930292017233LGN00
class Test_run_entityids(Base_CLITester.Base_CLITestCase):
def setUp(self):
super().setUp()
self.baseargs = ['entityids', 'LC81930292017233LGN00']
class Test_run_filenames(Base_CLITester.Base_CLITestCase):
def setUp(self):
super().setUp()
self.baseargs = ['filenames', 'LC08_L1TP_193029_20170821_20170911_01_T1.tar.gz']
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment