config.py 32.5 KB
Newer Older
1
# -*- coding: utf-8 -*-
2

3
import datetime
4
import socket
5
import os
6
import warnings
7
import numpy as np
8
import builtins
9
10
11
12
import re
import psycopg2
import psycopg2.extras
from collections import OrderedDict
13
import multiprocessing
14
from inspect import getargvalues, stack, getfullargspec, signature, _empty
15
import json
Daniel Scheffler's avatar
Daniel Scheffler committed
16
from json import JSONDecodeError
17
18
from jsmin import jsmin
from cerberus import Validator
19
import pkgutil
20
from pprint import pformat
21
from typing import TYPE_CHECKING
22

23
24
from .options_schema import gms_schema

25
if TYPE_CHECKING:
26
    from gms_preprocessing.misc.database_tools import GMS_JOB  # noqa F401  # flake8 issue
27

28

29
30
__author__ = 'Daniel Scheffler'

31

32
class GMS_configuration(object):
33
34
35
36
37
38
    def __getattr__(self, attr):
        if hasattr(builtins, 'GMS_JobConfig'):
            if attr in ['job', 'usecase']:
                # This is only to keep compatibility with HU-INF codes
                return getattr(builtins, 'GMS_JobConfig')
            return getattr(builtins.GMS_JobConfig, attr)
39
40
        else:
            raise EnvironmentError("Config has not been set already on this machine. Run 'set_config()' first!'")
41

42

43
GMS_config = GMS_configuration()
44
45


46
47
path_gmslib = os.path.dirname(pkgutil.get_loader("gms_preprocessing").path)
path_options_default = os.path.join(path_gmslib, 'options', 'options_default.json')
48
49


50
51
def set_config(job_ID, exec_mode='Python', db_host='localhost', reset=False, json_config='', exec_L1AP=None,
               exec_L1BP=None, exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None,
52
53
54
55
56
57
58
59
60
61
               allow_subMultiprocessing=True, disable_exception_handler=True, log_level='INFO',
               tiling_block_size_XY=(2048, 2048), is_test=False, profiling=False, benchmark_global=False,
               path_procdata_scenes=None, path_procdata_MGRS=None, path_archive=None):
    """Set up a configuration for a new gms_preprocessing job!

    :param job_ID:          job ID of the job to be executed, e.g. 123456 (must be present in database)
    :param exec_mode:       'Python': writes intermediate results to disk in order to save memory
                            'Flink': keeps intermediate results in memory in order to save IO time
    :param db_host:         host name of the server that runs the postgreSQL database
    :param reset:           whether to reset the job status or not (default=False)
62
    :param json_config      path to JSON file containing configuration parameters or a string in JSON format
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
    :param exec_L1AP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
    :param exec_L1BP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
    :param exec_L1CP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
    :param exec_L2AP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
    :param exec_L2BP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
    :param exec_L2CP:       list of 3 elements: [run processor, write output, delete output if not needed anymore]
    :param CPUs:            number of CPU cores to be used for processing (default: None -> use all available)
    :param allow_subMultiprocessing:
                            allow multiprocessing within workers
    :param disable_exception_handler:
                            enable/disable automatic handling of unexpected exceptions (default: True -> enabled)
    :param log_level:       the logging level to be used (choices: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL';
                            default: 'INFO')
    :param tiling_block_size_XY:
                            X/Y block size to be used for any tiling process (default: (2048,2048)
    :param is_test:         whether the current job represents a software test job (run by a test runner) or not
                            (default=False)
    :param profiling:       enable/disable code profiling (default: False)
    :param benchmark_global:
                            enable/disable benchmark of the whole processing pipeline
    :param path_procdata_scenes:
                            output path to store processed scenes
    :param path_procdata_MGRS:
                            output path to store processed MGRS tiles
    :param path_archive:    input path where downloaded data are stored
    """
    if not hasattr(builtins, 'GMS_JobConfig') or reset:
        kwargs = dict([x for x in locals().items() if x[0] != "self" and not x[0].startswith('__')])
        builtins.GMS_JobConfig = JobConfig(job_ID, **kwargs)

    return getattr(builtins, 'GMS_JobConfig')


class JobConfig(object):
    def __init__(self, ID, db_host='localhost', **user_opts):
        """Create a job configuration

        Workflow:
        # 0. Environment
        # 1. 2 Wege, wo JSON herkommen kann: per console-command oder aus Datenbank
        #       - bei console-command: GMS_JOB.from_... muss default-options in DB schreiben
        # => zuerst JobConfig auf Basis von JSON erstellen
        # 2. dann überschreiben mit user-defined parametern (entweder init-parameter oder db-settings per webapp)

        :param ID:              job ID of the job to be executed, e.g. 123456 (must be present in database)
        :param db_host:         host name of the server that runs the postgreSQL database
        """
        # privates
        self._DB_job_record = None  # type: GMS_JOB
        self._DB_config_table = None  # type: dict
113
        self._kwargs_defaults = None
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130

        # fixed attributes
        # possible values: 'pending', 'running', 'canceled', 'failed', 'finished_with_warnings',
        # 'finished_with_errors', 'finished'
        self.status = 'pending'
        self.start_time = datetime.datetime.now()
        self.end_time = None
        self.computation_time = None
        self.hostname = socket.gethostname()

        #######################
        # POPULATE PARAMETERS #
        #######################

        # args
        self.ID = ID
        self.db_host = db_host
131
        self.kwargs = user_opts
132
133
134
135
136
137

        # database connection
        self.conn_database = "dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=3" \
                             % self.db_host

        # get validated options dict from JSON-options
138
        json_opts = self.get_json_opts(validate=True)
139
140
141

        gp = self.get_parameter

142
143
144
145
146
        ##################
        # global options #
        ##################

        json_globts = json_opts['global_opts']  # type: dict
147
148

        self.exec_mode = \
149
            gp('exec_mode', json_globts['exec_mode'])
150
        self.CPUs = \
151
            gp('CPUs', json_globts['CPUs'], fallback=multiprocessing.cpu_count())
152
        self.allow_subMultiprocessing = \
153
            gp('allow_subMultiprocessing', json_globts['allow_subMultiprocessing'])
154
        self.disable_exception_handler = \
155
            gp('disable_exception_handler', json_globts['disable_exception_handler'])
156
        self.log_level = \
157
            gp('log_level', json_globts['log_level'])
158
        self.tiling_block_size_XY = \
159
            gp('tiling_block_size_XY', json_globts['tiling_block_size_XY'])
160
        self.is_test = \
161
            gp('is_test', json_globts['is_test'])
162
        self.profiling = \
163
            gp('profiling', json_globts['profiling'])
164
        self.benchmark_global = \
165
            gp('benchmark_global', json_globts['benchmark_global'])
166
167
168
169
170
171
172

        #########
        # paths #
        #########
        json_paths = json_opts['paths']  # type: dict

        self.path_spatIdxSrv = self.DB_config_table['path_spatial_index_mediator_server']
173
174
175
176
        self.path_tempdir = self.absP(self.DB_config_table['path_tempdir'])
        self.path_ac_tables = self.absP(self.DB_config_table['path_ac_tables'])
        self.path_SNR_models = self.absP(self.DB_config_table['path_SNR_models'])
        self.path_dem_proc_srtm_90m = self.absP(self.DB_config_table['path_dem_proc_srtm_90m'])
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193

        if not self.is_test:
            # normal mode
            self.path_fileserver = self.DB_config_table['path_data_root']

            self.path_archive = \
                gp('path_archive', json_paths['path_archive'],
                   fallback=self.joinP(self.path_fileserver, self.DB_config_table['foldername_download']))

            self.path_procdata_scenes = \
                gp('path_procdata_scenes', json_paths['path_procdata_scenes'],
                   fallback=self.joinP(self.path_fileserver, self.DB_config_table['foldername_procdata_scenes']))

            self.path_procdata_MGRS = \
                gp('path_procdata_MGRS', json_paths['path_procdata_MGRS'],
                   fallback=self.joinP(self.path_fileserver, self.DB_config_table['foldername_procdata_MGRS']))

194
195
196
197
198
            self.path_earthSunDist = self.absP(self.DB_config_table['path_earthSunDist'])
            self.path_SRFs = self.absP(self.DB_config_table['path_SRFs'])
            self.path_cloud_classif = self.absP(self.DB_config_table['path_cloud_classif'])
            self.path_solar_irr = self.absP(self.DB_config_table['path_solar_irr'])
            self.path_ECMWF_db = self.absP(self.DB_config_table['path_ECMWF_db'])
199
200
201

            self.path_benchmarks = \
                gp('path_benchmarks', json_paths['path_benchmarks'],
202
                   fallback=self.absP(self.DB_config_table['path_benchmarks']))
203
204

            self.path_job_logs = \
205
206
                gp('path_job_logs', json_paths['path_job_logs'],
                   fallback=self.absP(self.DB_config_table['path_job_logs']))
207
208
209

        else:
            # software test mode, the repository should be self-contained -> use only relative paths
210
211
212
213
214
215
216
217
218
219
220
221
222
            self.path_fileserver = self.joinP(path_gmslib, '..', 'tests', 'data')
            self.path_archive = self.joinP(path_gmslib, '..', 'tests', 'data', 'archive_data')
            self.path_procdata_scenes = self.joinP(path_gmslib, '..', 'tests', 'data', 'output_scenes')
            self.path_procdata_MGRS = self.joinP(path_gmslib, '..', 'tests', 'data', 'output_mgrs_tiles')
            self.path_earthSunDist = self.joinP(path_gmslib, 'database', 'earth_sun_distance',
                                                'Earth_Sun_distances_per_day_edited.csv')
            self.path_SRFs = self.joinP(path_gmslib, 'database', 'srf')
            self.path_cloud_classif = self.joinP(path_gmslib, 'database', 'cloud_classifier')
            self.path_solar_irr = self.joinP(path_gmslib, 'database', 'solar_irradiance',
                                             'SUNp1fontenla__350-2500nm_@0.1nm_converted.txt')
            self.path_ECMWF_db = self.joinP(path_gmslib, '..', 'tests', 'data', 'processed_ECMWF')
            self.path_benchmarks = self.joinP(path_gmslib, '..', 'tests', 'data', 'benchmarks')
            self.path_job_logs = self.joinP(path_gmslib, 'logs', 'job_logs')
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249

        ###########################
        # processor configuration #
        ###########################

        json_processors = json_opts['processors']  # type: dict

        # general_opts
        self.skip_thermal = \
            gp('skip_thermal', json_processors['general_opts']['skip_thermal'])
        self.skip_pan = \
            gp('skip_pan', json_processors['general_opts']['skip_pan'])
        self.sort_bands_by_cwl = \
            gp('sort_bands_by_cwl', json_processors['general_opts']['sort_bands_by_cwl'])
        self.conversion_type_optical = \
            gp('conversion_type_optical', json_processors['general_opts']['conversion_type_optical'])
        self.conversion_type_thermal = \
            gp('conversion_type_thermal', json_processors['general_opts']['conversion_type_thermal'])
        self.scale_factor_TOARef = \
            gp('scale_factor_TOARef', json_processors['general_opts']['scale_factor_TOARef'])
        self.scale_factor_BOARef = \
            gp('scale_factor_BOARef', json_processors['general_opts']['scale_factor_BOARef'])

        # processor specific opts

        # L1A
        self.exec_L1AP = gp('exec_L1AP', [
250
251
252
            json_processors['L1A']['run_processor'],
            json_processors['L1A']['write_output'],
            json_processors['L1A']['delete_output']])
253
        self.SZA_SAA_calculation_accurracy = \
254
            gp('SZA_SAA_calculation_accurracy', json_processors['L1A']['SZA_SAA_calculation_accurracy'])
255
        self.export_VZA_SZA_SAA_RAA_stats = \
256
            gp('export_VZA_SZA_SAA_RAA_stats', json_processors['L1A']['export_VZA_SZA_SAA_RAA_stats'])
257
258
259

        # L1B
        self.exec_L1BP = gp('exec_L1BP', [
260
261
262
263
            json_processors['L1B']['run_processor'],
            json_processors['L1B']['write_output'],
            json_processors['L1B']['delete_output']])
        self.skip_coreg = gp('skip_coreg', json_processors['L1B']['skip_coreg'])
264
265
266

        # L1C
        self.exec_L1CP = gp('exec_L1CP', [
267
268
269
            json_processors['L1C']['run_processor'],
            json_processors['L1C']['write_output'],
            json_processors['L1C']['delete_output']])
270
        self.cloud_masking_algorithm = \
271
            gp('cloud_masking_algorithm', json_processors['L1C']['cloud_masking_algorithm'])
272
        self.export_L1C_obj_dumps = \
273
            gp('export_L1C_obj_dumps', json_processors['L1C']['export_L1C_obj_dumps'])
274
        self.scale_factor_errors_ac = \
275
            gp('scale_factor_errors_ac', json_processors['L1C']['scale_factor_errors_ac'])
276
        self.auto_download_ecmwf = \
277
            gp('auto_download_ecmwf', json_processors['L1C']['auto_download_ecmwf'])
278
279
280

        # L2A
        self.exec_L2AP = gp('exec_L2AP', [
281
282
283
284
285
            json_processors['L2A']['run_processor'],
            json_processors['L2A']['write_output'],
            json_processors['L2A']['delete_output']])
        self.align_coord_grids = gp('align_coord_grids', json_processors['L2A']['align_coord_grids'])
        self.match_gsd = gp('match_gsd', json_processors['L2A']['match_gsd'])
286
287
288

        # L2B
        self.exec_L2BP = gp('exec_L2BP', [
289
290
291
            json_processors['L2B']['run_processor'],
            json_processors['L2B']['write_output'],
            json_processors['L2B']['delete_output']])
292
293
294

        # L2C
        self.exec_L2CP = gp('exec_L2CP', [
295
296
297
            json_processors['L2C']['run_processor'],
            json_processors['L2C']['write_output'],
            json_processors['L2C']['delete_output']])
298
299
300
301
302
303
304
305
306
307
308
309
310
311

        ################################
        # target sensor specifications #
        ################################

        self.virtual_sensor_id = gp('virtual_sensor_id', attr_db_job_record='virtualsensorid')
        # FIXME Why is datasetid_spatial_ref missing in virtual_sensors table
        self.datasetid_spatial_ref = gp('datasetid_spatial_ref', attr_db_job_record='datasetid_spatial_ref')

        VSSpecs = self.get_virtual_sensor_specs()
        self.virtual_sensor_name = VSSpecs['name']

        # spectral specifications
        self.datasetid_spectral_ref = VSSpecs['spectral_characteristics_datasetid']
312
        # FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
313
        self.target_CWL = VSSpecs['wavelengths_pos']
314
        # FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
315
316
317
318
        self.target_FWHM = VSSpecs['band_width']

        # spatial specifications
        target_gsd_tmp = VSSpecs['spatial_resolution']  # table features only 1 value for X/Y-dims FIXME user inputs?
319
        # FIXME target GSD setting is a duplicate to datasetid_spatial_ref!
320
321
        self.target_gsd = xgsd, ygsd = \
            [target_gsd_tmp]*2 if isinstance(target_gsd_tmp, (int, float)) else target_gsd_tmp
322
        self.target_epsg_code = VSSpecs['projection_epsg']
323
        # FIXME values in case user defines only Landsat?
324
325
        self.spatial_ref_gridx = np.arange(xgsd / 2., xgsd / 2. + 2 * xgsd, xgsd).tolist()  # e.g. [15, 45]
        self.spatial_ref_gridy = np.arange(ygsd / 2., ygsd / 2. + 2 * ygsd, ygsd).tolist()
326

327
328
329
330
331
332
        #############
        # data list #
        #############

        self.data_list = self.get_data_list_of_current_jobID()

333
334
335
336
337
338
        ############
        # validate #
        ############

        self.validate_exec_configs()

339
    @property
340
341
    def kwargs_defaults(self):
        if not self._kwargs_defaults:
342
            a = getfullargspec(set_config)
343
            self._kwargs_defaults = dict(zip(a.args[-len(a.defaults):], a.defaults))
344

345
        return self._kwargs_defaults
346

347
348
349
350
351
352
353
354
355
356
357
358
359
360
    def get_init_argskwargs(self, ignore=("logger",)):
        """
        Return a tuple containing dictionary of calling function's. named arguments and a list of
        calling function's unnamed positional arguments.
        """

        posname, kwname, argskwargs = getargvalues(stack()[1][0])[-3:]
        argskwargs.update(argskwargs.pop(kwname, []))
        argskwargs = {k: v for k, v in argskwargs.items() if k not in ignore and k != 'self' and not k.startswith('__')}
        sig = signature(self.__init__)
        argsnames = [k for k in sig.parameters if sig.parameters[k].default == _empty]
        return {'args': {k: v for k, v in argskwargs.items() if k in argsnames},
                'kwargs': {k: v for k, v in argskwargs.items() if k not in argsnames}}

361
362
    def get_parameter(self, key_user_opts, val_json=None, attr_db_job_record='', fallback=None):
        # 1. JobConfig parameters: parameters that are directly passed to JobConfig
363
364
        if key_user_opts in self.kwargs and self.kwargs[key_user_opts] != self.kwargs_defaults[key_user_opts]:
            return self.kwargs[key_user_opts]
365
366
367
368
369
370

        # 2. WebUI parameters: parameters that have been defined via WebUI
        if attr_db_job_record:
            return getattr(self.DB_job_record, attr_db_job_record)

        # 3. JSON parameters: parameters that have been defined via JSON Input (command line or advanced UI params)
371
        if val_json and val_json is not None:
372
373
374
            return val_json

        # fallback: if nothing has been returned until here
375
        if fallback is None and key_user_opts in self.kwargs_defaults:
376
            fallback = self.kwargs_defaults[key_user_opts]
377
378
379
380
381
382
383
        return fallback

    @property
    def DB_job_record(self):
        # type: () -> GMS_JOB
        if not self._DB_job_record:
            # check if job ID exists in database
384
            from ..misc.database_tools import GMS_JOB  # noqa F811  # redefinition of unused 'GMS_JOB' from line 22
385
386
387
388
389
390
391
392
393
394
395
396
            try:
                self._DB_job_record = GMS_JOB(self.conn_database).from_job_ID(self.ID)
            except ValueError:
                raise

        return self._DB_job_record

    @property
    def DB_config_table(self):
        # type: () -> dict
        """Returns the content of the config table of the postgreSQL database as dictionary."""
        if not self._DB_config_table:
397
            from ..misc.database_tools import get_info_from_postgreSQLdb
398
399
400
401
402
403
404
405
406
407
408
            db_cfg = dict(get_info_from_postgreSQLdb(self.conn_database, 'config', ['key', 'value']))

            # convert relative to absolute paths
            self._DB_config_table = {k: self.absP(v) if k.startswith('path_') and v.startswith('./') else v
                                     for k, v in db_cfg.items()}

        return self._DB_config_table

    def get_virtual_sensor_specs(self):
        # type: () -> dict
        """Returns the content of the virtual_sensors table of the postgreSQL database as dictionary."""
409
        from ..misc.database_tools import get_info_from_postgreSQLdb
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427

        # column spectral_characteristics_datasetid is not used later because its given by jobs.datasetid_spatial_ref
        cols2read = ['name', 'projection_epsg', 'spatial_resolution', 'spectral_characteristics_datasetid',
                     'wavelengths_pos', 'band_width']

        res = get_info_from_postgreSQLdb(self.conn_database, 'virtual_sensors',
                                         cols2read, {'id': self.virtual_sensor_id})[0]

        VSSpecs = dict()
        for i, col in enumerate(cols2read):
            val = res[i]
            if col == 'spectral_characteristics_datasetid' and val == -1:  # nodata value
                val = None

            VSSpecs[col] = val

        return VSSpecs

428
    def get_json_opts(self, validate=True):
429
430
431
432
433
        """Get a dictionary of GMS config parameters according to the jobs table of the database.

        NOTE: Reads the default options from options_default.json and updates the values with those from database.
        """
        # read options_default.json
434
        default_options = get_options(path_options_default, validation=validate)
435

436
437
438
439
440
441
442
443
444
445
446
447
448
        if 'json_config' in self.kwargs and self.kwargs['json_config']:
            if self.kwargs['json_config'].startswith("{"):
                try:
                    params_dict = json.loads(jsmin(self.kwargs['json_config']))
                except JSONDecodeError:
                    warnings.warn('The given JSON options string could not be decoded. '
                                  'JSON decoder failed with the following error:')
                    raise
            elif os.path.isfile(self.kwargs['json_config']):
                try:
                    with open(self.kwargs['json_config'], 'r') as inF:
                        params_dict = json.loads(jsmin(inF.read()))
                except JSONDecodeError:
Daniel Scheffler's avatar
Daniel Scheffler committed
449
                    warnings.warn('The given JSON options file %s could not be decoded. '
450
451
452
453
454
455
456
457
458
459
                                  'JSON decoder failed with the following error:' % self.kwargs['json_config'])
                    raise

            else:
                raise ValueError("The parameter 'json_config' must be a JSON formatted string or a JSON file on disk.")

            # convert values to useful data types and update the default values
            params_dict = json_to_python(params_dict)
            default_options.update(params_dict)

460
        # update default options with those from DB
461
        if self.DB_job_record.analysis_parameter:
Daniel Scheffler's avatar
Daniel Scheffler committed
462
463
464
            try:
                params_dict = json.loads(jsmin(self.DB_job_record.analysis_parameter))
            except JSONDecodeError:
465
466
                warnings.warn('The advanced options given in the WebUI could not be decoded. '
                              'JSON decoder failed with the following error:')
Daniel Scheffler's avatar
Daniel Scheffler committed
467
468
                raise

469
470
471
            # convert values to useful data types and update the default values
            params_dict = json_to_python(params_dict)
            default_options.update(params_dict)
472
473

        if validate:
474
            GMSValidator(allow_unknown=True, schema=gms_schema).validate(default_options)
475
476
477
478
479
480
481
482
483
484
485
486

        json_options = default_options
        return json_options

    @staticmethod
    def absP(relP):
        return os.path.abspath(os.path.join(os.path.dirname(__file__), relP))

    @staticmethod
    def joinP(*items):
        return os.path.join(*items)

487
488
489
490
491
492
493
494
495
496
    def get_data_list_of_current_jobID(self):
        """
        Get a list of datasets to be processed from database and return it together with some metadata.

        :return:    <list> of OrderedDicts, e.g. [OrderedDict([('proc_level', None), ('scene_ID', 5895940),
                    ('datasetid', 104), ('image_type', 'RSD'), ('satellite', 'Landsat-8'), ('sensor', 'OLI_TIRS'),
                    ('subsystem', ''), ('acquisition_date', datetime.datetime(2015, 2, 5, 10, 2, 52)),
                    ('entity_ID', 'LC81930242015036LGN00'), ('filename', 'LC81930242015036LGN00.tar.gz'),
                    ('sensormode', 'M'), ('logger', None)]), ...]
        """
497
        from ..model.metadata import get_sensormode
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
        data_list = []
        with psycopg2.connect(self.conn_database) as conn:
            with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
                cur.execute("""
                    WITH jobs_unnested AS (
                            SELECT id, unnest(sceneids) AS sceneid FROM jobs
                        )
                    SELECT jobs_unnested.sceneid,
                           scenes.datasetid,
                           scenes.acquisitiondate,
                           scenes.entityid,
                           scenes.filename,
                           COALESCE(scenes_proc.proc_level::text, 'L1A') AS proc_level,
                           datasets.image_type,
                           satellites.name AS satellite,
                           sensors.name AS sensor,
                           subsystems.name AS subsystem
                    FROM jobs_unnested
                    LEFT OUTER JOIN scenes ON scenes.id = jobs_unnested.sceneid
                    LEFT OUTER JOIN scenes_proc ON scenes_proc.sceneid = jobs_unnested.sceneid
                    LEFT OUTER JOIN datasets ON datasets.id = datasetid
                    LEFT OUTER JOIN satellites ON satellites.id = satelliteid
                    LEFT OUTER JOIN sensors ON sensors.id = sensorid
                    LEFT OUTER JOIN subsystems ON subsystems.id = subsystemid
                    WHERE jobs_unnested.id = %s
                    """,
                            (self.ID,))

                for row in cur.fetchall():
                    ds = OrderedDict()
                    ds["proc_level"] = row["proc_level"]
                    ds["scene_ID"] = row["sceneid"]
                    ds["dataset_ID"] = row["datasetid"]
                    ds["image_type"] = row["image_type"]
                    ds["satellite"] = row["satellite"]
                    ds["sensor"] = row["sensor"]
                    ds["subsystem"] = row["subsystem"]
                    ds["acq_datetime"] = row["acquisitiondate"]
                    ds["entity_ID"] = row["entityid"]
                    ds["filename"] = row["filename"]

                    ds['sensor'] = 'ETM+' if re.search('ETM+', ds['sensor']) else ds['sensor']
                    if self.skip_thermal and ds['subsystem'] == 'TIR':
                        continue  # removes ASTER TIR in case of skip_thermal
                    ds['subsystem'] = '' if ds['subsystem'] is None else ds['subsystem']
                    ds['sensormode'] = get_sensormode(ds)
                    if self.skip_pan and ds['sensormode'] == 'P':
                        continue  # removes e.g. SPOT PAN in case of skip_pan

                    if re.search("Sentinel-2A", ds['satellite'], re.I):
                        for subsystem in ['S2A10', 'S2A20', 'S2A60']:
                            sub_ds = ds.copy()
                            sub_ds['subsystem'] = subsystem
                            data_list.append(sub_ds)
                    elif re.search("Terra", ds['satellite'], re.I):
                        for subsystem in ['VNIR1', 'VNIR2', 'SWIR', 'TIR']:
                            sub_ds = ds.copy()
                            sub_ds['subsystem'] = subsystem
                            data_list.append(sub_ds)
                    else:
                        data_list.append(ds)

        self.data_list = data_list
        return self.data_list

Daniel Scheffler's avatar
Daniel Scheffler committed
563
    def validate_exec_configs(self):
564
        for i in ['L1AP', 'L1BP', 'L1CP', 'L2AP', 'L2BP', 'L2CP']:
565
            exec_lvl = getattr(self, 'exec_%s' % i)
Daniel Scheffler's avatar
Daniel Scheffler committed
566
567

            # check input format
568
569
            if not all([len(exec_lvl) == 3,
                        (np.array(exec_lvl) == np.array(np.array(exec_lvl, np.bool), np.int)).all()]):
Daniel Scheffler's avatar
Daniel Scheffler committed
570
                raise ValueError('Execution mode must be provided as list of 3 elements containing only boolean '
571
                                 'values. Got %s for %s.' % (exec_lvl, i))
572

Daniel Scheffler's avatar
Daniel Scheffler committed
573
574
575
576
            # written output cannot be turned off in execution mode 'Python'
            if self.exec_mode == 'Python' and exec_lvl[1] == 0:
                warnings.warn("If job.exec_mode is set to 'Python' the output writer for %s has to be enabled "
                              "because any operations on GMS_obj.arr read the intermediate results from disk. "
577
                              "Turning it on.." % i)
Daniel Scheffler's avatar
Daniel Scheffler committed
578
                exec_lvl[1] = 1
579

580
581
    def to_dict(self):
        opts_default = get_options(path_options_default)
582

583
584
585
586
587
588
589
590
591
        # add all keys included in options_default.json
        outdict = dict()
        for key in opts_default.keys():
            if not isinstance(opts_default[key], (dict, OrderedDict)):
                outdict[key] = getattr(self, key)
            else:
                group = key
                if group not in outdict:
                    outdict[group] = dict()
592

593
594
595
596
597
598
599
                for group_key in opts_default[group]:
                    if not isinstance(opts_default[group][group_key], (dict, OrderedDict)):
                        outdict[group][group_key] = getattr(self, group_key)
                    else:
                        subgroup = group_key
                        if subgroup not in outdict[group]:
                            outdict[group][subgroup] = dict()
600

601
602
603
604
605
606
607
608
609
610
611
                        for subgroup_key in opts_default[group][subgroup]:
                            try:
                                outdict[group][subgroup][subgroup_key] = getattr(self, subgroup_key)
                            except AttributeError:
                                procexec_keys = ['run_processor', 'write_output', 'delete_output']
                                if subgroup_key in procexec_keys:
                                    proc_code = subgroup
                                    outdict[group][subgroup][subgroup_key] = \
                                        getattr(self, 'exec_%sP' % proc_code)[procexec_keys.index(subgroup_key)]
                                else:
                                    raise
612

613
614
615
616
        # add job metadata
        outdict.update(dict(
            job_meta={k: getattr(self, k) for k in ['ID', 'start_time', 'end_time', 'computation_time', 'hostname']},
            data_list={'dataset_%s' % i: ds for i, ds in enumerate(self.data_list)}))
617

618
        # add data_list
619

620
        return outdict
621

622
623
    def to_jsonable_dict(self):
        return python_to_json(self.to_dict())
624

625
626
    def __repr__(self):
        return pformat(self.to_dict())
627
628


629
630
631
632
633
634
def is_GMSConfig_available():
    try:
        if GMS_config.job is not None:
            return True
    except (EnvironmentError, OSError):
        return False
635
636


637
def json_to_python(value):
638
639
640
641
642
643
644
    def is_number(s):
        try:
            float(s)
            return True
        except ValueError:
            return False

645
646
647
648
    if type(value) is dict:
        return {json_to_python(k): json_to_python(v) for k, v in value.items()}
    elif type(value) is list:
        return [json_to_python(v) for v in value]
649
    else:
650
        if value == "None":
651
            return None
652
        if value == "slice(None, None, None)":
653
            return slice(None)
654
        if value in [True, "true"]:
655
            return True
656
        if value in [False, "false"]:
657
            return False
658
        if is_number(value):
659
            try:
660
661
                if str(int(value)) != str(float(value)):
                    return int(value)
662
                else:
663
                    return float(value)
664
            except ValueError:
665
                return float(value)
666
        else:
667
668
669
            return value


670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
def python_to_json(value):
    if type(value) in [dict, OrderedDict]:
        return {python_to_json(k): python_to_json(v) for k, v in value.items()}
    elif type(value) is list:
        return [python_to_json(v) for v in value]
    elif type(value) is np.ndarray:
        return [python_to_json(v) for v in value.tolist()]
    else:
        if value is None:
            return "None"
        if value is slice(None):
            return "slice(None, None, None)"
        if value is True:
            return "true"
        if value is False:
            return "false"
        if type(value) is datetime.datetime:
            return datetime.datetime.strftime(value, '%Y-%m-%d %H:%M:%S.%f%z')
        else:
            return value


692
693
694
695
696
697
698
class GMSValidator(Validator):
    def __init__(self, *args, **kwargs):
        """

        :param args:    Arguments to be passed to cerberus.Validator
        :param kwargs:  Keyword arguments to be passed to cerberus.Validator
        """
699
        super(GMSValidator, self).__init__(*args, **kwargs)
700
701
702
703

    def validate(self, document2validate, **kwargs):
        if super(GMSValidator, self).validate(document=document2validate, **kwargs) is False:
            raise ValueError("Options is malformed: %s" % str(self.errors))
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719


def get_options(target, validation=True):
    """
    return dictionary will all options
    :param validation: True / False, whether to validate options read from files ot not
    :param target: if path to file, then json is used to load, otherwise the default template
    is used
    :return: dictionary with options
    """

    if os.path.isfile(target):
        with open(target, "r") as fl:
            options = json_to_python(json.loads(jsmin(fl.read())))

        if validation is True:
720
            GMSValidator(allow_unknown=True, schema=gms_schema).validate(options)
721
722
723
724

        return options
    else:
        raise FileNotFoundError("target: %s is not a valid file path" % target)