config.py 38.9 KB
Newer Older
1
# -*- coding: utf-8 -*-
2

3
4
# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
#
5
# Copyright (C) 2020  Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6
7
8
9
10
11
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# This program is free software: you can redistribute it and/or modify it under
12
# the terms of the GNU General Public License as published by the Free Software
13
14
# Foundation, either version 3 of the License, or (at your option) any later version.
# Please note the following exception: `gms_preprocessing` depends on tqdm, which
15
16
17
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18
19
20
21
22
23
24
25
26
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

27
import datetime
28
import socket
29
import os
30
import warnings
31
import numpy as np
32
import builtins
33
34
35
import re
import psycopg2
import psycopg2.extras
36
37
from collections import OrderedDict
from collections.abc import Mapping
38
import multiprocessing
39
from inspect import getargvalues, stack, signature, _empty
40
import json
Daniel Scheffler's avatar
Daniel Scheffler committed
41
from json import JSONDecodeError
42
43
from jsmin import jsmin
from cerberus import Validator
44
import pkgutil
45
import logging
46
47
import time
import psutil
48
from pprint import pformat
49
from typing import TYPE_CHECKING
50

51
from .options_schema import gms_schema_input, gms_schema_config_output, parameter_mapping, get_param_from_json_config
52
from ..version import __version__, __versionalias__
53

54
if TYPE_CHECKING:
55
    from gms_preprocessing.misc.database_tools import GMS_JOB  # noqa F401  # flake8 issue
56

57

58
59
__author__ = 'Daniel Scheffler'

60

61
class GMS_configuration(object):
62
63
64
65
66
67
    def __getattr__(self, attr):
        if hasattr(builtins, 'GMS_JobConfig'):
            if attr in ['job', 'usecase']:
                # This is only to keep compatibility with HU-INF codes
                return getattr(builtins, 'GMS_JobConfig')
            return getattr(builtins.GMS_JobConfig, attr)
68
69
        else:
            raise EnvironmentError("Config has not been set already on this machine. Run 'set_config()' first!'")
70

71
72
73
    def __repr__(self):
        return getattr(builtins, 'GMS_JobConfig').__repr__()

74

75
GMS_config = GMS_configuration()  # type: JobConfig
76
77


78
79
path_gmslib = os.path.dirname(pkgutil.get_loader("gms_preprocessing").path)
path_options_default = os.path.join(path_gmslib, 'options', 'options_default.json')
80
81


82
def set_config(job_ID, json_config='', reset_status=False, **kwargs):
83
84
85
    """Set up a configuration for a new gms_preprocessing job!

    :param job_ID:          job ID of the job to be executed, e.g. 123456 (must be present in database)
86
    :param json_config:     path to JSON file containing configuration parameters or a string in JSON format
87
    :param reset_status:    whether to reset the job status or not (default=False)
88
    :param kwargs:          keyword arguments to be passed to JobConfig
89
90
                            NOTE: All keyword arguments given here WILL OVERRIDE configurations that have been
                                  previously set via WebUI or via the json_config parameter!
91

92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
    :Keyword Arguments:
        - inmem_serialization:     False: write intermediate results to disk in order to save memory
                                   True:  keep intermediate results in memory in order to save IO time
        - parallelization_level:   <str> choices: 'scenes' - parallelization on scene-level
                                                   'tiles'  - parallelization on tile-level
        - db_host:         host name of the server that runs the postgreSQL database
        - spatial_index_server_host:   host name of the server that runs the SpatialIndexMediator
        - spatial_index_server_port:   port used for connecting to SpatialIndexMediator
        - delete_old_output:        <bool> whether to delete previously created output of the given job ID
                                    before running the job (default = False)
        - exec_L1AP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
        - exec_L1BP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
        - exec_L1CP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
        - exec_L2AP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
        - exec_L2BP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
        - exec_L2CP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
        - CPUs:            number of CPU cores to be used for processing (default: None -> use all available)
        - allow_subMultiprocessing:
                           allow multiprocessing within workers
        - disable_exception_handler:
                           enable/disable automatic handling of unexpected exceptions (default: True -> enabled)
        - log_level:       the logging level to be used (choices: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL';
                           default: 'INFO')
        - tiling_block_size_XY:
                           X/Y block size to be used for any tiling process (default: (2048,2048)
        - is_test:         whether the current job represents a software test job (run by a test runner) or not
                           (default=False)
        - profiling:       enable/disable code profiling (default: False)
        - benchmark_global:
                           enable/disable benchmark of the whole processing pipeline
        - path_procdata_scenes:
                           output path to store processed scenes
        - path_procdata_MGRS:
                           output path to store processed MGRS tiles
        - path_archive:    input path where downloaded data are stored
        - virtual_sensor_id:       1:  Landsat-8,  10: Sentinel-2A 10m
        - datasetid_spatial_ref:   249 Sentinel-2A
Daniel Scheffler's avatar
Daniel Scheffler committed
129
    :rtype: JobConfig
130
    """
131
132
133
134
135
136
    # validate kwargs
    for k in kwargs:
        if k not in parameter_mapping and k != 'db_host':
            raise ValueError("'%s' is not a valid parameter. Valid parameters are: \n%s"
                             % (k, list(parameter_mapping.keys())))

137
138
139
    #################################
    # set GMS_JobConfig in builtins #
    #################################
140
    # FIXME virtual_sensor_id and datasetid_spatial_ref are not respected by JobConfig.
141
    if not hasattr(builtins, 'GMS_JobConfig') or reset_status:
142
        builtins.GMS_JobConfig = JobConfig(job_ID, json_config=json_config, **kwargs)
143

144
145
146
147
148
149
    #####################
    # check environment #
    #####################
    if not hasattr(builtins, 'GMS_EnvOK') or not getattr(builtins, 'GMS_EnvOK'):
        # check environment
        from ..misc import environment as ENV
150
151
        logger = logging.getLogger('GMSEnvironmentChecker')
        logger.setLevel(logging.DEBUG)
152
153
154
155
        GMSEnv = ENV.GMSEnvironment()
        GMSEnv.check_dependencies()
        GMSEnv.check_read_write_permissions()
        GMSEnv.ensure_properly_activated_GDAL()
156
        GMSEnv.check_ecmwf_api_creds()
157

158
159
        # close unclosed locks from previous runs
        from ..misc.locks import release_unclosed_locks
160
        release_unclosed_locks()
161

162
163
        builtins.GMS_EnvOK = True

164
165
166
    return getattr(builtins, 'GMS_JobConfig')


167
168
169
170
171
172
173
174
175
176
177
178
def get_conn_database(hostname='localhost', timeout=3):
    # type: (str, int) -> str
    """Return database connection string.

    :param hostname:    the host that runs the GMS postgreSQL database
    :param timeout:     connection timeout in seconds
    :return:
    """
    return "dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=%d" \
           % (hostname, timeout)


179
class JobConfig(object):
180
    def __init__(self, ID, json_config='', **user_opts):
181
182
183
184
185
186
187
188
189
        """Create a job configuration

        Workflow:
        # 0. Environment
        # 1. 2 Wege, wo JSON herkommen kann: per console-command oder aus Datenbank
        #       - bei console-command: GMS_JOB.from_... muss default-options in DB schreiben
        # => zuerst JobConfig auf Basis von JSON erstellen
        # 2. dann überschreiben mit user-defined parametern (entweder init-parameter oder db-settings per webapp)

190
        :param ID:          job ID of the job to be executed, e.g. 123456 (must be present in database)
191
192
        :param json_config  path to JSON file containing configuration parameters or a string in JSON format
        :param user_opts    keyword arguments as passed by gms_preprocessing.set_config()
193
194
195
196
        """
        # privates
        self._DB_job_record = None  # type: GMS_JOB
        self._DB_config_table = None  # type: dict
197
        self._kwargs_defaults = None
198
199
200
201
202
203
204
205
206

        # fixed attributes
        # possible values: 'pending', 'running', 'canceled', 'failed', 'finished_with_warnings',
        # 'finished_with_errors', 'finished'
        self.status = 'pending'
        self.start_time = datetime.datetime.now()
        self.end_time = None
        self.computation_time = None
        self.hostname = socket.gethostname()
207
208
        self.version = __version__
        self.versionalias = __versionalias__
209
210
211
212
213
214
215

        #######################
        # POPULATE PARAMETERS #
        #######################

        # args
        self.ID = ID
216
        self.json_config = json_config
217
        self.kwargs = user_opts
218
219

        # database connection
220
        self.db_host = user_opts['db_host']
221
        self.conn_database = get_conn_database(hostname=self.db_host)
222
223

        # get validated options dict from JSON-options
224
        self.json_opts_fused_valid = self.get_json_opts(validate=True)
225
226
227

        gp = self.get_parameter

228
229
230
231
        ##################
        # global options #
        ##################

232
233
234
235
236
237
238
239
240
241
242
243
244
245
        self.inmem_serialization = gp('inmem_serialization')
        self.parallelization_level = gp('parallelization_level')
        self.spatial_index_server_host = gp('spatial_index_server_host')
        self.spatial_index_server_port = gp('spatial_index_server_port')
        self.CPUs = gp('CPUs', fallback=multiprocessing.cpu_count())
        self.CPUs_all_jobs = gp('CPUs_all_jobs')
        self.max_mem_usage = gp('max_mem_usage')
        self.critical_mem_usage = gp('critical_mem_usage')
        self.max_parallel_reads_writes = gp('max_parallel_reads_writes')
        self.allow_subMultiprocessing = gp('allow_subMultiprocessing')
        self.delete_old_output = gp('delete_old_output')
        self.disable_exception_handler = gp('disable_exception_handler')
        self.disable_IO_locks = gp('disable_IO_locks')
        self.disable_CPU_locks = gp('disable_CPU_locks')
246
        self.disable_DB_locks = gp('disable_DB_locks')
247
248
249
250
251
252
253
        self.disable_memory_locks = gp('disable_memory_locks')
        self.min_version_mem_usage_stats = gp('min_version_mem_usage_stats')
        self.log_level = gp('log_level')
        self.tiling_block_size_XY = gp('tiling_block_size_XY')
        self.is_test = gp('is_test')
        self.profiling = gp('profiling')
        self.benchmark_global = gp('benchmark_global')
254
255
256
257
258

        #########
        # paths #
        #########

259
        # external
260
        self.path_spatIdxSrv = self.DB_config_table['path_spatial_index_mediator_server']
261
        self.path_tempdir = self.absP(self.DB_config_table['path_tempdir'])
262
        self.path_custom_sicor_options = gp('path_custom_sicor_options')
263
        self.path_dem_proc_srtm_90m = self.absP(self.DB_config_table['path_dem_proc_srtm_90m'])
264
        self.path_spechomo_classif = gp('path_spechomo_classif')
265
266
267
268
269
270
271
272

        # internal (included in gms_preprocessing repository)
        self.path_earthSunDist = self.joinP(path_gmslib, 'database', 'earth_sun_distance',
                                            'Earth_Sun_distances_per_day_edited.csv')
        self.path_SNR_models = self.joinP(path_gmslib, 'database', 'snr')
        self.path_cloud_classif = self.joinP(path_gmslib, 'database', 'cloud_classifier')
        self.path_solar_irr = self.joinP(path_gmslib, 'database', 'solar_irradiance',
                                         'SUNp1fontenla__350-2500nm_@0.1nm_converted.txt')
273
274

        if not self.is_test:
275
276
277
            def get_dbpath(dbkey):
                return self.joinP(self.path_fileserver, self.DB_config_table[dbkey])

278
279
            # normal mode
            self.path_fileserver = self.DB_config_table['path_data_root']
280
281
282
            self.path_archive = gp('path_archive', fallback=get_dbpath('foldername_download'))
            self.path_procdata_scenes = gp('path_procdata_scenes', fallback=get_dbpath('foldername_procdata_scenes'))
            self.path_procdata_MGRS = gp('path_procdata_MGRS', fallback=get_dbpath('foldername_procdata_MGRS'))
283
            self.path_ECMWF_db = gp('path_ECMWF_db', fallback=self.absP(self.DB_config_table['path_ECMWF_db']))
284
285
            self.path_benchmarks = gp('path_benchmarks', fallback=self.absP(self.DB_config_table['path_benchmarks']))
            self.path_job_logs = gp('path_job_logs', fallback=get_dbpath('foldername_job_logs'))
286
287
288

        else:
            # software test mode, the repository should be self-contained -> use only relative paths
289
290
291
            self.path_fileserver = self.joinP(path_gmslib, '..', 'tests', 'data')
            self.path_archive = self.joinP(path_gmslib, '..', 'tests', 'data', 'archive_data')
            self.path_procdata_scenes = self.joinP(path_gmslib, '..', 'tests', 'data', 'output_scenes')
292
            # self.path_procdata_scenes = '/data1/gms/mount/db/data/processed_scenes/test_loggingerrors/'  # FIXME
293
294
295
            self.path_procdata_MGRS = self.joinP(path_gmslib, '..', 'tests', 'data', 'output_mgrs_tiles')
            self.path_ECMWF_db = self.joinP(path_gmslib, '..', 'tests', 'data', 'processed_ECMWF')
            self.path_benchmarks = self.joinP(path_gmslib, '..', 'tests', 'data', 'benchmarks')
296
            self.path_job_logs = self.joinP(path_gmslib, '..', 'tests', 'data', 'job_logs')
297
            # self.path_job_logs = '/data1/gms/mount/db/data/job_logs/'  # FIXME
298
299
300
301
302
303

        ###########################
        # processor configuration #
        ###########################

        # general_opts
304
305
306
307
308
309
310
311
312
313
        self.skip_thermal = gp('skip_thermal')
        self.skip_pan = gp('skip_pan')
        self.sort_bands_by_cwl = gp('sort_bands_by_cwl')
        self.target_radunit_optical = gp('target_radunit_optical')
        self.target_radunit_thermal = gp('target_radunit_thermal')
        self.scale_factor_TOARef = gp('scale_factor_TOARef')
        self.scale_factor_BOARef = gp('scale_factor_BOARef')
        self.mgrs_pixel_buffer = gp('mgrs_pixel_buffer')
        self.output_data_compression = gp('output_data_compression')
        self.write_ENVIclassif_cloudmask = gp('write_ENVIclassif_cloudmask')
314

315
316
317
        # processor specific opts

        # L1A
318
319
320
        self.exec_L1AP = gp('exec_L1AP')
        self.SZA_SAA_calculation_accurracy = gp('SZA_SAA_calculation_accurracy')
        self.export_VZA_SZA_SAA_RAA_stats = gp('export_VZA_SZA_SAA_RAA_stats')
321
322

        # L1B
323
324
325
326
327
328
329
330
331
332
        self.exec_L1BP = gp('exec_L1BP')
        self.skip_coreg = gp('skip_coreg')
        self.spatial_ref_min_overlap = gp('spatial_ref_min_overlap')
        self.spatial_ref_min_cloudcov = gp('spatial_ref_min_cloudcov')
        self.spatial_ref_max_cloudcov = gp('spatial_ref_max_cloudcov')
        self.spatial_ref_plusminus_days = gp('spatial_ref_plusminus_days')
        self.spatial_ref_plusminus_years = gp('spatial_ref_plusminus_years')
        self.coreg_band_wavelength_for_matching = gp('coreg_band_wavelength_for_matching')
        self.coreg_max_shift_allowed = gp('coreg_max_shift_allowed')
        self.coreg_window_size = gp('coreg_window_size')
333
334

        # L1C
335
336
337
338
339
340
341
342
343
344
        self.exec_L1CP = gp('exec_L1CP')
        self.cloud_masking_algorithm = gp('cloud_masking_algorithm')
        self.export_L1C_obj_dumps = gp('export_L1C_obj_dumps')
        self.auto_download_ecmwf = gp('auto_download_ecmwf')
        self.ac_fillnonclear_areas = gp('ac_fillnonclear_areas')
        self.ac_clear_area_labels = gp('ac_clear_area_labels')
        self.ac_scale_factor_errors = gp('ac_scale_factor_errors')
        self.ac_max_ram_gb = gp('ac_max_ram_gb')
        self.ac_estimate_accuracy = gp('ac_estimate_accuracy')
        self.ac_bandwise_accuracy = gp('ac_bandwise_accuracy')
345
346

        # L2A
347
348
349
350
351
352
        self.exec_L2AP = gp('exec_L2AP')
        self.align_coord_grids = gp('align_coord_grids')
        self.match_gsd = gp('match_gsd')
        self.spatial_resamp_alg = gp('spatial_resamp_alg')
        self.clip_to_extent = gp('clip_to_extent')
        self.spathomo_estimate_accuracy = gp('spathomo_estimate_accuracy')
353
354

        # L2B
355
356
        self.exec_L2BP = gp('exec_L2BP')
        self.spechomo_method = gp('spechomo_method')
357
        self.spechomo_n_clusters = gp('spechomo_n_clusters')
Daniel Scheffler's avatar
Daniel Scheffler committed
358
359
        self.spechomo_rfr_n_trees = 50  # this is a static config value, not a user option
        self.spechomo_rfr_tree_depth = 10  # this is a static config value, not a user option
360
361
        self.spechomo_classif_alg = gp('spechomo_classif_alg')
        self.spechomo_kNN_n_neighbors = gp('spechomo_kNN_n_neighbors')
362
363
        self.spechomo_estimate_accuracy = gp('spechomo_estimate_accuracy')
        self.spechomo_bandwise_accuracy = gp('spechomo_bandwise_accuracy')
364

365
366
367
368
369
        if self.spechomo_method == 'RFR':
            raise NotImplementedError("The spectral harmonization method 'RFR' is currently not completely implemented."
                                      "Please us another one.")
            # FIXME RFR classifiers are missing (cannot be added to the repository to to file size > 1 GB)

370
        # L2C
371
        self.exec_L2CP = gp('exec_L2CP')
372
373
374
375
376
377
378
379
380
381
382
383
384

        ################################
        # target sensor specifications #
        ################################

        self.virtual_sensor_id = gp('virtual_sensor_id', attr_db_job_record='virtualsensorid')
        # FIXME Why is datasetid_spatial_ref missing in virtual_sensors table
        self.datasetid_spatial_ref = gp('datasetid_spatial_ref', attr_db_job_record='datasetid_spatial_ref')

        VSSpecs = self.get_virtual_sensor_specs()
        self.virtual_sensor_name = VSSpecs['name']

        # spectral specifications
385
        self.datasetid_spectral_ref = VSSpecs['spectral_characteristics_datasetid']  # None in case of custom sensor
386
        # FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
387
        self.target_CWL = VSSpecs['wavelengths_pos']
388
        # FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
389
390
391
392
        self.target_FWHM = VSSpecs['band_width']

        # spatial specifications
        target_gsd_tmp = VSSpecs['spatial_resolution']  # table features only 1 value for X/Y-dims FIXME user inputs?
393
        # FIXME target GSD setting is a duplicate to datasetid_spatial_ref!
394
395
        self.target_gsd = xgsd, ygsd = \
            [target_gsd_tmp]*2 if isinstance(target_gsd_tmp, (int, float)) else target_gsd_tmp
396
        self.target_epsg_code = VSSpecs['projection_epsg']
397
        # FIXME values in case user defines only Landsat?
398
399
        self.spatial_ref_gridx = np.arange(xgsd / 2., xgsd / 2. + 2 * xgsd, xgsd).tolist()  # e.g. [15, 45]
        self.spatial_ref_gridy = np.arange(ygsd / 2., ygsd / 2. + 2 * ygsd, ygsd).tolist()
400

401
402
403
404
405
406
        #############
        # data list #
        #############

        self.data_list = self.get_data_list_of_current_jobID()

407
408
409
        ############
        # validate #
        ############
410
411
        self.validate_exec_configs()

412
        GMSValidator(allow_unknown=True, schema=gms_schema_config_output).validate(self.to_dict())
413

414
415
416
417
        # check if parallel read/write processes have been limited on a storage mountpoint shared between multiple hosts
        if self.max_parallel_reads_writes != 0:
            self.check_no_read_write_limit_on_xtfs_mountpoint()

418
419
420
421
422
423
424
425
426
427
428
429
430
431
    def get_init_argskwargs(self, ignore=("logger",)):
        """
        Return a tuple containing dictionary of calling function's. named arguments and a list of
        calling function's unnamed positional arguments.
        """

        posname, kwname, argskwargs = getargvalues(stack()[1][0])[-3:]
        argskwargs.update(argskwargs.pop(kwname, []))
        argskwargs = {k: v for k, v in argskwargs.items() if k not in ignore and k != 'self' and not k.startswith('__')}
        sig = signature(self.__init__)
        argsnames = [k for k in sig.parameters if sig.parameters[k].default == _empty]
        return {'args': {k: v for k, v in argskwargs.items() if k in argsnames},
                'kwargs': {k: v for k, v in argskwargs.items() if k not in argsnames}}

432
    def get_parameter(self, key_user_opts, attr_db_job_record='', fallback=None):
433
        # 1. JobConfig parameters: parameters that are directly passed to JobConfig
434
        if key_user_opts in self.kwargs:
435
            return self.kwargs[key_user_opts]
436
437
438
439
440
441

        # 2. WebUI parameters: parameters that have been defined via WebUI
        if attr_db_job_record:
            return getattr(self.DB_job_record, attr_db_job_record)

        # 3. JSON parameters: parameters that have been defined via JSON Input (command line or advanced UI params)
442
        val_json = get_param_from_json_config(key_user_opts, self.json_opts_fused_valid)
443
        if val_json or val_json is False:
444
445
446
447
448
449
450
451
452
453
            return val_json

        # fallback: if nothing has been returned until here
        return fallback

    @property
    def DB_job_record(self):
        # type: () -> GMS_JOB
        if not self._DB_job_record:
            # check if job ID exists in database
454
            from ..misc.database_tools import GMS_JOB  # noqa F811  # redefinition of unused 'GMS_JOB' from line 22
455
456
457
458
459
460
461
462
463
464
465
466
            try:
                self._DB_job_record = GMS_JOB(self.conn_database).from_job_ID(self.ID)
            except ValueError:
                raise

        return self._DB_job_record

    @property
    def DB_config_table(self):
        # type: () -> dict
        """Returns the content of the config table of the postgreSQL database as dictionary."""
        if not self._DB_config_table:
467
            from ..misc.database_tools import get_info_from_postgreSQLdb
468
469
470
471
472
473
474
475
476
477
478
            db_cfg = dict(get_info_from_postgreSQLdb(self.conn_database, 'config', ['key', 'value']))

            # convert relative to absolute paths
            self._DB_config_table = {k: self.absP(v) if k.startswith('path_') and v.startswith('./') else v
                                     for k, v in db_cfg.items()}

        return self._DB_config_table

    def get_virtual_sensor_specs(self):
        # type: () -> dict
        """Returns the content of the virtual_sensors table of the postgreSQL database as dictionary."""
479
        from ..misc.database_tools import get_info_from_postgreSQLdb
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497

        # column spectral_characteristics_datasetid is not used later because its given by jobs.datasetid_spatial_ref
        cols2read = ['name', 'projection_epsg', 'spatial_resolution', 'spectral_characteristics_datasetid',
                     'wavelengths_pos', 'band_width']

        res = get_info_from_postgreSQLdb(self.conn_database, 'virtual_sensors',
                                         cols2read, {'id': self.virtual_sensor_id})[0]

        VSSpecs = dict()
        for i, col in enumerate(cols2read):
            val = res[i]
            if col == 'spectral_characteristics_datasetid' and val == -1:  # nodata value
                val = None

            VSSpecs[col] = val

        return VSSpecs

498
    def get_json_opts(self, validate=True):
499
500
501
502
        """Get a dictionary of GMS config parameters according to the jobs table of the database.

        NOTE: Reads the default options from options_default.json and updates the values with those from database.
        """
503
504
505
506
507
508
509
510
        def update_dict(d, u):
            for k, v in u.items():
                if isinstance(v, Mapping):
                    d[k] = update_dict(d.get(k, {}), v)
                else:
                    d[k] = v
            return d

511
        # read options_default.json
512
        default_options = get_options(path_options_default, validation=validate)
513

514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
        #############################################
        # update default options with those from DB #
        #############################################

        if self.DB_job_record.analysis_parameter:
            try:
                params_dict = json.loads(jsmin(self.DB_job_record.analysis_parameter))
            except JSONDecodeError:
                warnings.warn('The advanced options given in the WebUI could not be decoded. '
                              'JSON decoder failed with the following error:')
                raise

            # convert values to useful data types and update the default values
            params_dict = json_to_python(params_dict)
            update_dict(default_options, params_dict)

        ###############################################################################################################
        # if json config is provided (via python bindings or CLI parser -> override all options with that json config #
        ###############################################################################################################

        if self.json_config:
            if self.json_config.startswith("{"):
536
                try:
537
                    params_dict = json.loads(jsmin(self.json_config))
538
539
540
541
                except JSONDecodeError:
                    warnings.warn('The given JSON options string could not be decoded. '
                                  'JSON decoder failed with the following error:')
                    raise
542
            elif os.path.isfile(self.json_config):
543
                try:
544
                    with open(self.json_config, 'r') as inF:
545
546
                        params_dict = json.loads(jsmin(inF.read()))
                except JSONDecodeError:
Daniel Scheffler's avatar
Daniel Scheffler committed
547
                    warnings.warn('The given JSON options file %s could not be decoded. '
548
                                  'JSON decoder failed with the following error:' % self.json_config)
549
550
551
552
553
554
555
                    raise

            else:
                raise ValueError("The parameter 'json_config' must be a JSON formatted string or a JSON file on disk.")

            # convert values to useful data types and update the default values
            params_dict = json_to_python(params_dict)
556
            update_dict(default_options, params_dict)
557
558

        if validate:
559
            GMSValidator(allow_unknown=True, schema=gms_schema_input).validate(default_options)
560
561
562
563
564
565
566
567
568
569
570
571

        json_options = default_options
        return json_options

    @staticmethod
    def absP(relP):
        return os.path.abspath(os.path.join(os.path.dirname(__file__), relP))

    @staticmethod
    def joinP(*items):
        return os.path.join(*items)

572
573
574
575
576
577
578
579
580
581
    def get_data_list_of_current_jobID(self):
        """
        Get a list of datasets to be processed from database and return it together with some metadata.

        :return:    <list> of OrderedDicts, e.g. [OrderedDict([('proc_level', None), ('scene_ID', 5895940),
                    ('datasetid', 104), ('image_type', 'RSD'), ('satellite', 'Landsat-8'), ('sensor', 'OLI_TIRS'),
                    ('subsystem', ''), ('acquisition_date', datetime.datetime(2015, 2, 5, 10, 2, 52)),
                    ('entity_ID', 'LC81930242015036LGN00'), ('filename', 'LC81930242015036LGN00.tar.gz'),
                    ('sensormode', 'M'), ('logger', None)]), ...]
        """
582
        from ..model.metadata import get_sensormode
583
584
585
        data_list = []
        with psycopg2.connect(self.conn_database) as conn:
            with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
586
                cur.execute(r"""
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
                    WITH jobs_unnested AS (
                            SELECT id, unnest(sceneids) AS sceneid FROM jobs
                        )
                    SELECT jobs_unnested.sceneid,
                           scenes.datasetid,
                           scenes.acquisitiondate,
                           scenes.entityid,
                           scenes.filename,
                           COALESCE(scenes_proc.proc_level::text, 'L1A') AS proc_level,
                           datasets.image_type,
                           satellites.name AS satellite,
                           sensors.name AS sensor,
                           subsystems.name AS subsystem
                    FROM jobs_unnested
                    LEFT OUTER JOIN scenes ON scenes.id = jobs_unnested.sceneid
                    LEFT OUTER JOIN scenes_proc ON scenes_proc.sceneid = jobs_unnested.sceneid
                    LEFT OUTER JOIN datasets ON datasets.id = datasetid
                    LEFT OUTER JOIN satellites ON satellites.id = satelliteid
                    LEFT OUTER JOIN sensors ON sensors.id = sensorid
                    LEFT OUTER JOIN subsystems ON subsystems.id = subsystemid
                    WHERE jobs_unnested.id = %s
                    """,
                            (self.ID,))

                for row in cur.fetchall():
                    ds = OrderedDict()
613
                    ds["proc_level"] = row["proc_level"] if not self.is_test else None
614
615
616
617
618
619
620
621
622
623
                    ds["scene_ID"] = row["sceneid"]
                    ds["dataset_ID"] = row["datasetid"]
                    ds["image_type"] = row["image_type"]
                    ds["satellite"] = row["satellite"]
                    ds["sensor"] = row["sensor"]
                    ds["subsystem"] = row["subsystem"]
                    ds["acq_datetime"] = row["acquisitiondate"]
                    ds["entity_ID"] = row["entityid"]
                    ds["filename"] = row["filename"]

624
                    ds['sensor'] = 'ETM+' if re.search(r'ETM+', ds['sensor']) else ds['sensor']
625
626
627
628
629
630
631
                    if self.skip_thermal and ds['subsystem'] == 'TIR':
                        continue  # removes ASTER TIR in case of skip_thermal
                    ds['subsystem'] = '' if ds['subsystem'] is None else ds['subsystem']
                    ds['sensormode'] = get_sensormode(ds)
                    if self.skip_pan and ds['sensormode'] == 'P':
                        continue  # removes e.g. SPOT PAN in case of skip_pan

632
                    if re.search(r"Sentinel-2A", ds['satellite'], re.I):
633
634
635
636
                        for subsystem in ['S2A10', 'S2A20', 'S2A60']:
                            sub_ds = ds.copy()
                            sub_ds['subsystem'] = subsystem
                            data_list.append(sub_ds)
637
                    elif re.search(r"Sentinel-2B", ds['satellite'], re.I):
638
639
640
641
                        for subsystem in ['S2B10', 'S2B20', 'S2B60']:
                            sub_ds = ds.copy()
                            sub_ds['subsystem'] = subsystem
                            data_list.append(sub_ds)
642
                    elif re.search(r"Terra", ds['satellite'], re.I):
643
644
645
646
647
648
649
650
651
652
                        for subsystem in ['VNIR1', 'VNIR2', 'SWIR', 'TIR']:
                            sub_ds = ds.copy()
                            sub_ds['subsystem'] = subsystem
                            data_list.append(sub_ds)
                    else:
                        data_list.append(ds)

        self.data_list = data_list
        return self.data_list

Daniel Scheffler's avatar
Daniel Scheffler committed
653
    def validate_exec_configs(self):
654
        for i in ['L1AP', 'L1BP', 'L1CP', 'L2AP', 'L2BP', 'L2CP']:
655
            exec_lvl = getattr(self, 'exec_%s' % i)
656
657
658

            if exec_lvl is None:
                continue
Daniel Scheffler's avatar
Daniel Scheffler committed
659
660

            # check input format
661
662
663
664
            if all([len(exec_lvl) == 3, (np.array(exec_lvl) == np.array(np.array(exec_lvl, np.bool), np.int)).all()]):
                execute, write, delete = exec_lvl

                # written output cannot be turned off in execution mode 'Python'
665
666
                if not self.inmem_serialization and execute and not write:
                    warnings.warn("If CFG.inmem_serialization is False the output writer for %s has to be enabled "
667
668
669
                                  "because any operations on GMS_obj.arr read the intermediate results from disk. "
                                  "Turning it on.." % i)
                    write = True
670
                    setattr(self, 'exec_%s' % i, [execute, write, delete])
671
672

            else:
Daniel Scheffler's avatar
Daniel Scheffler committed
673
                raise ValueError('Execution mode must be provided as list of 3 elements containing only boolean '
674
                                 'values. Got %s for %s.' % (exec_lvl, i))
675

676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
    def check_no_read_write_limit_on_xtfs_mountpoint(self):
        intensive_IO_paths = [self.path_fileserver, self.path_archive, self.path_benchmarks,
                              self.path_dem_proc_srtm_90m, self.path_ECMWF_db, self.path_procdata_MGRS,
                              self.path_procdata_scenes]

        mount_points = {el.mountpoint: el for el in psutil.disk_partitions(all=True)}

        for path in intensive_IO_paths:
            for mp, mp_object in mount_points.items():
                if path.startswith(mp) and mp_object.device.startswith('xtreemfs'):
                    warnings.warn("Path %s appears to be on an XtreemFS mountpoint. It is highly recommended to set "
                                  "the configuration parameter 'max_parallel_reads_writes' to 0 in that case! "
                                  "Otherwise read/write processes might be slowed down! Continuing in 20 seconds.."
                                  % path)
                    time.sleep(20)
                    break

693
    def to_dict(self):
694
        """Generate a dictionary in the same structure like the one in options_default.json from the current config."""
695
        opts_default = get_options(path_options_default)
696

697
698
699
700
701
702
703
704
705
        # add all keys included in options_default.json
        outdict = dict()
        for key in opts_default.keys():
            if not isinstance(opts_default[key], (dict, OrderedDict)):
                outdict[key] = getattr(self, key)
            else:
                group = key
                if group not in outdict:
                    outdict[group] = dict()
706

707
708
709
710
711
712
713
                for group_key in opts_default[group]:
                    if not isinstance(opts_default[group][group_key], (dict, OrderedDict)):
                        outdict[group][group_key] = getattr(self, group_key)
                    else:
                        subgroup = group_key
                        if subgroup not in outdict[group]:
                            outdict[group][subgroup] = dict()
714

715
716
717
718
719
720
721
722
723
724
725
                        for subgroup_key in opts_default[group][subgroup]:
                            try:
                                outdict[group][subgroup][subgroup_key] = getattr(self, subgroup_key)
                            except AttributeError:
                                procexec_keys = ['run_processor', 'write_output', 'delete_output']
                                if subgroup_key in procexec_keys:
                                    proc_code = subgroup
                                    outdict[group][subgroup][subgroup_key] = \
                                        getattr(self, 'exec_%sP' % proc_code)[procexec_keys.index(subgroup_key)]
                                else:
                                    raise
726

727
728
729
730
        # add job metadata
        outdict.update(dict(
            job_meta={k: getattr(self, k) for k in ['ID', 'start_time', 'end_time', 'computation_time', 'hostname']},
            data_list={'dataset_%s' % i: ds for i, ds in enumerate(self.data_list)}))
731

732
        # add data_list
733

734
        return outdict
735

736
    def to_jsonable_dict(self):
737
        # type: (JobConfig) -> dict
738
        return python_to_json(self.to_dict())
739

740
741
742
743
744
745
746
747
    def save(self, path_outfile):
        """Save the JobConfig instance to a JSON file in the same structure like the one in options_default.json.

        :param path_outfile:    path of the output JSON file
        """
        with open(path_outfile, 'w') as outF:
            json.dump(self.to_jsonable_dict(), outF, skipkeys=False, indent=4)

748
749
    def __repr__(self):
        return pformat(self.to_dict())
750
751


752
753
def is_GMSConfig_available():
    try:
754
        if GMS_config is not None:
755
756
757
            return True
    except (EnvironmentError, OSError):
        return False
758
759


760
def json_to_python(value):
761
762
763
764
765
766
767
    def is_number(s):
        try:
            float(s)
            return True
        except ValueError:
            return False

768
769
770
771
    if type(value) is dict:
        return {json_to_python(k): json_to_python(v) for k, v in value.items()}
    elif type(value) is list:
        return [json_to_python(v) for v in value]
772
    else:
773
        if value == "None":
774
            return None
775
        if value == "slice(None, None, None)":
776
            return slice(None)
777
        if value in [True, "true"]:
778
            return True
779
        if value in [False, "false"]:
780
            return False
781
        if is_number(value):
782
            try:
783
784
                if str(int(value)) != str(float(value)):
                    return int(value)
785
                else:
786
                    return float(value)
787
            except ValueError:
788
                return float(value)
789
        else:
790
791
792
            return value


793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
def python_to_json(value):
    if type(value) in [dict, OrderedDict]:
        return {python_to_json(k): python_to_json(v) for k, v in value.items()}
    elif type(value) is list:
        return [python_to_json(v) for v in value]
    elif type(value) is np.ndarray:
        return [python_to_json(v) for v in value.tolist()]
    else:
        if value is None:
            return "None"
        if value is slice(None):
            return "slice(None, None, None)"
        if value is True:
            return "true"
        if value is False:
            return "false"
        if type(value) is datetime.datetime:
            return datetime.datetime.strftime(value, '%Y-%m-%d %H:%M:%S.%f%z')
        else:
            return value


815
816
817
818
819
820
821
class GMSValidator(Validator):
    def __init__(self, *args, **kwargs):
        """

        :param args:    Arguments to be passed to cerberus.Validator
        :param kwargs:  Keyword arguments to be passed to cerberus.Validator
        """
822
        super(GMSValidator, self).__init__(*args, **kwargs)
823
824
825
826

    def validate(self, document2validate, **kwargs):
        if super(GMSValidator, self).validate(document=document2validate, **kwargs) is False:
            raise ValueError("Options is malformed: %s" % str(self.errors))
827
828
829


def get_options(target, validation=True):
830
831
832
833
    """Return dictionary with all options.

    :param validation: True / False, whether to validate options read from files or not
    :param target: if path to file, then json is used to load, otherwise the default template is used
834
835
836
837
838
839
840
841
    :return: dictionary with options
    """

    if os.path.isfile(target):
        with open(target, "r") as fl:
            options = json_to_python(json.loads(jsmin(fl.read())))

        if validation is True:
842
            GMSValidator(allow_unknown=True, schema=gms_schema_input).validate(options)
843
844
845

        return options
    else:
846
        raise FileNotFoundError("Options file not found at file path %s." % target)