Commit 3e1d28bc authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Added function to get jsonable dict from config. Revised options schema. Moved...

Added function to get jsonable dict from config. Revised options schema. Moved config and options to new submodule 'options'.


Former-commit-id: 81770264
parent 81d2763b
......@@ -8,7 +8,7 @@ from . import algorithms # noqa: E402
from . import io # noqa: E402
from . import misc # noqa: E402
from . import processing # noqa: E402
from . import config # noqa: E402
from .options.config import set_config # noqa: E402
from .processing.process_controller import process_controller # noqa: E402
__author__ = """Daniel Scheffler"""
......
......@@ -17,12 +17,13 @@ from json import JSONDecodeError
from jsmin import jsmin
from cerberus import Validator
import pkgutil
from pprint import pformat
from typing import TYPE_CHECKING
from .options_schema import gms_schema
if TYPE_CHECKING:
from .misc.database_tools import GMS_JOB # noqa F401 # flake8 issue
from gms_preprocessing.misc.database_tools import GMS_JOB # noqa F401 # flake8 issue
__author__ = 'Daniel Scheffler'
......@@ -42,6 +43,10 @@ class GMS_configuration(object):
GMS_config = GMS_configuration()
path_options_default = os.path.join(os.path.dirname(pkgutil.get_loader("gms_preprocessing").path), 'options',
'options_default.json')
def set_config(job_ID, exec_mode='Python', db_host='localhost', reset=False, json_config='', exec_L1AP=None,
exec_L1BP=None, exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None,
allow_subMultiprocessing=True, disable_exception_handler=True, log_level='INFO',
......@@ -86,121 +91,6 @@ def set_config(job_ID, exec_mode='Python', db_host='localhost', reset=False, jso
builtins.GMS_JobConfig = JobConfig(job_ID, **kwargs)
return getattr(builtins, 'GMS_JobConfig')
# user_options = \
# {
# "exec_mode": exec_mode,
# "db_host": db_host,
# "CPUs": CPUs,
# "allow_subMultiprocessing": allow_subMultiprocessing,
# "disable_exception_handler": disable_exception_handler,
# "log_level": log_level,
# "tiling_block_size_XY": tiling_block_size_XY,
# "is_test": is_test,
# "profiling": profiling,
# "benchmark_global": benchmark_global,
# "paths": {
# # "path_fileserver": path_fileserver,
# "path_archive": path_archive,
# "path_procdata_scenes": path_procdata_scenes,
# "path_procdata_MGRS": path_procdata_MGRS,
# # "path_tempdir": path_tempdir,
# # "path_benchmarks": path_benchmarks,
# # "path_job_logs": path_job_logs,
# # "path_spatIdxSrv": path_spatIdxSrv,
# # "path_ac_tables": path_ac_tables,
# # "path_SNR_models": path_SNR_models,
# # "path_SRFs": path_SRFs,
# # "path_dem_proc_srtm_90m": path_dem_proc_srtm_90m,
# # "path_earthSunDist": path_earthSunDist,
# # "path_solar_irr": path_solar_irr,
# # "path_cloud_classif": path_cloud_classif,
# # "path_ECMWF_db": path_ECMWF_db
# },
#
# "processors": {
# "general_cfg": {
# "skip_thermal": skip_thermal,
# "skip_pan": skip_pan,
# "sort_bands_by_cwl": sort_bands_by_cwl,
# "conversion_type_optical": conversion_type_optical,
# "conversion_type_thermal": conversion_type_thermal,
# "scale_factor_TOARef": scale_factor_TOARef,
# "scale_factor_BOARef": scale_factor_BOARef,
# },
# "L1A_P": {
# "run_processor": exec_L1AP[0],
# "write_output": exec_L1AP[1],
# "delete_output": exec_L1AP[2],
# "SZA_SAA_calculation_accurracy": SZA_SAA_calculation_accurracy,
# "export_VZA_SZA_SAA_RAA_stats": export_VZA_SZA_SAA_RAA_stats
#
# },
# "L1B_P": {
# "run_processor": exec_L1BP[0],
# "write_output": exec_L1BP[1],
# "delete_output": exec_L1BP[2],
# "skip_coreg": skip_coreg,
# },
# "L1C_P": {
# "run_processor": exec_L1CP[0],
# "write_output": exec_L1CP[1],
# "delete_output": exec_L1CP[2],
# "cloud_masking_algorithm": {
# "Landsat-4": "FMASK",
# "Landsat-5": "FMASK",
# "Landsat-7": "FMASK",
# "Landsat-8": "FMASK",
# "Sentinel-2A": "SICOR",
# "Sentinel-2B": "SICOR"
# },
# "export_L1C_obj_dumps": export_L1C_obj_dumps,
# "scale_factor_errors_ac": 255,
# "auto_download_ecmwf": auto_download_ecmwf
# },
# "L2A_P": {
# "run_processor": exec_L2AP[0],
# "write_output": exec_L2AP[1],
# "delete_output": exec_L2AP[2],
# },
# "L2B_P": {
# "run_processor": exec_L2BP[0],
# "write_output": exec_L2BP[1],
# "delete_output": exec_L2BP[2],
# },
# "L2C_P": {
# "run_processor": exec_L2CP[0],
# "write_output": exec_L2CP[1],
# "delete_output": exec_L2CP[2],
# }
# },
# "usecase": {
# "virtual_sensor_id": virtual_sensor_id,
# "virtual_sensor_name": virtual_sensor_name,
# "datasetid_spatial_ref": datasetid_spatial_ref,
# "datasetid_spectral_ref": datasetid_spectral_ref,
# "target_CWL": [
#
# ], /*list of central wavelength positions of target sensor. Empty list means: use WebApp input.*/
# "target_FWHM": [
#
# ], /*list of band widths of target sensor. Empty list means: use WebApp input.*/
# "target_gsd": [
#
# ], /*X/Y pixel size of target sensor as list with two float/integer values*/
# "target_epsg_code": "None", /*target projection as EPSG code. "None": use projection of input data.*/
# "spatial_ref_gridx": [
#
# ], /*target sensor x-coordinate-grid. e.g. [15, 45]*/
# "spatial_ref_gridy": [
#
# ], /*target sensor y-coordinate-grid. e.g. [15, 45]*/
# "align_coord_grids": true, /*allows to force deactivation of image resampling*/
# "match_gsd": true
# }
# }
#
# }
#
class JobConfig(object):
......@@ -249,28 +139,30 @@ class JobConfig(object):
gp = self.get_parameter
###################
# general options #
###################
##################
# global options #
##################
json_globts = json_opts['global_opts'] # type: dict
self.exec_mode = \
gp('exec_mode', json_opts['exec_mode'])
gp('exec_mode', json_globts['exec_mode'])
self.CPUs = \
gp('CPUs', json_opts['CPUs'], fallback=multiprocessing.cpu_count())
gp('CPUs', json_globts['CPUs'], fallback=multiprocessing.cpu_count())
self.allow_subMultiprocessing = \
gp('allow_subMultiprocessing', json_opts['allow_subMultiprocessing'])
gp('allow_subMultiprocessing', json_globts['allow_subMultiprocessing'])
self.disable_exception_handler = \
gp('disable_exception_handler', json_opts['allow_subMultiprocessing'])
gp('disable_exception_handler', json_globts['disable_exception_handler'])
self.log_level = \
gp('log_level', json_opts['log_level'])
gp('log_level', json_globts['log_level'])
self.tiling_block_size_XY = \
gp('tiling_block_size_XY', json_opts['tiling_block_size_XY'])
gp('tiling_block_size_XY', json_globts['tiling_block_size_XY'])
self.is_test = \
gp('is_test', json_opts['is_test'])
gp('is_test', json_globts['is_test'])
self.profiling = \
gp('profiling', json_opts['profiling'])
gp('profiling', json_globts['profiling'])
self.benchmark_global = \
gp('benchmark_global', json_opts['benchmark_global'])
gp('benchmark_global', json_globts['benchmark_global'])
#########
# paths #
......@@ -353,54 +245,54 @@ class JobConfig(object):
# L1A
self.exec_L1AP = gp('exec_L1AP', [
json_processors['L1A_P']['run_processor'],
json_processors['L1A_P']['write_output'],
json_processors['L1A_P']['delete_output']])
json_processors['L1A']['run_processor'],
json_processors['L1A']['write_output'],
json_processors['L1A']['delete_output']])
self.SZA_SAA_calculation_accurracy = \
gp('SZA_SAA_calculation_accurracy', json_processors['L1A_P']['SZA_SAA_calculation_accurracy'])
gp('SZA_SAA_calculation_accurracy', json_processors['L1A']['SZA_SAA_calculation_accurracy'])
self.export_VZA_SZA_SAA_RAA_stats = \
gp('export_VZA_SZA_SAA_RAA_stats', json_processors['L1A_P']['SZA_SAA_calculation_accurracy'])
gp('export_VZA_SZA_SAA_RAA_stats', json_processors['L1A']['SZA_SAA_calculation_accurracy'])
# L1B
self.exec_L1BP = gp('exec_L1BP', [
json_processors['L1B_P']['run_processor'],
json_processors['L1B_P']['write_output'],
json_processors['L1B_P']['delete_output']])
self.skip_coreg = gp('skip_coreg', json_processors['L1B_P']['skip_coreg'])
json_processors['L1B']['run_processor'],
json_processors['L1B']['write_output'],
json_processors['L1B']['delete_output']])
self.skip_coreg = gp('skip_coreg', json_processors['L1B']['skip_coreg'])
# L1C
self.exec_L1CP = gp('exec_L1CP', [
json_processors['L1C_P']['run_processor'],
json_processors['L1C_P']['write_output'],
json_processors['L1C_P']['delete_output']])
json_processors['L1C']['run_processor'],
json_processors['L1C']['write_output'],
json_processors['L1C']['delete_output']])
self.cloud_masking_algorithm = \
gp('cloud_masking_algorithm', json_processors['L1C_P']['cloud_masking_algorithm'])
gp('cloud_masking_algorithm', json_processors['L1C']['cloud_masking_algorithm'])
self.export_L1C_obj_dumps = \
gp('export_L1C_obj_dumps', json_processors['L1C_P']['export_L1C_obj_dumps'])
gp('export_L1C_obj_dumps', json_processors['L1C']['export_L1C_obj_dumps'])
self.scale_factor_errors_ac = \
gp('scale_factor_errors_ac', json_processors['L1C_P']['scale_factor_errors_ac'])
gp('scale_factor_errors_ac', json_processors['L1C']['scale_factor_errors_ac'])
self.auto_download_ecmwf = \
gp('auto_download_ecmwf', json_processors['L1C_P']['auto_download_ecmwf'])
gp('auto_download_ecmwf', json_processors['L1C']['auto_download_ecmwf'])
# L2A
self.exec_L2AP = gp('exec_L2AP', [
json_processors['L2A_P']['run_processor'],
json_processors['L2A_P']['write_output'],
json_processors['L2A_P']['delete_output']])
self.align_coord_grids = gp('align_coord_grids', json_processors['L2A_P']['align_coord_grids'])
self.match_gsd = gp('match_gsd', json_processors['L2A_P']['match_gsd'])
json_processors['L2A']['run_processor'],
json_processors['L2A']['write_output'],
json_processors['L2A']['delete_output']])
self.align_coord_grids = gp('align_coord_grids', json_processors['L2A']['align_coord_grids'])
self.match_gsd = gp('match_gsd', json_processors['L2A']['match_gsd'])
# L2B
self.exec_L2BP = gp('exec_L2BP', [
json_processors['L2B_P']['run_processor'],
json_processors['L2B_P']['write_output'],
json_processors['L2B_P']['delete_output']])
json_processors['L2B']['run_processor'],
json_processors['L2B']['write_output'],
json_processors['L2B']['delete_output']])
# L2C
self.exec_L2CP = gp('exec_L2CP', [
json_processors['L2C_P']['run_processor'],
json_processors['L2C_P']['write_output'],
json_processors['L2C_P']['delete_output']])
json_processors['L2C']['run_processor'],
json_processors['L2C']['write_output'],
json_processors['L2C']['delete_output']])
################################
# target sensor specifications #
......@@ -415,14 +307,17 @@ class JobConfig(object):
# spectral specifications
self.datasetid_spectral_ref = VSSpecs['spectral_characteristics_datasetid']
# FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
self.target_CWL = VSSpecs['wavelengths_pos']
# FIXME column is empty a known datasetid as spectral characteristics virtual sensor is chosen:
self.target_FWHM = VSSpecs['band_width']
# spatial specifications
target_gsd_tmp = VSSpecs['spatial_resolution'] # table features only 1 value for X/Y-dims FIXME user inputs?
# FIXME target GSD setting is a duplicate to datasetid_spatial_ref!
self.target_gsd = xgsd, ygsd = \
[target_gsd_tmp]*2 if isinstance(target_gsd_tmp, (int, float)) else target_gsd_tmp
self.EPSG = VSSpecs['projection_epsg']
self.target_epsg_code = VSSpecs['projection_epsg']
# FIXME values in case user defines only Landsat?
self.spatial_ref_gridx = np.arange(xgsd / 2., xgsd / 2. + 2 * xgsd, xgsd) # e.g. [15, 45]
self.spatial_ref_gridy = np.arange(ygsd / 2., ygsd / 2. + 2 * ygsd, ygsd)
......@@ -433,6 +328,12 @@ class JobConfig(object):
self.data_list = self.get_data_list_of_current_jobID()
############
# validate #
############
self.validate_exec_configs()
@property
def kwargs_defaults(self):
if not self._kwargs_defaults:
......@@ -441,6 +342,20 @@ class JobConfig(object):
return self._kwargs_defaults
def get_init_argskwargs(self, ignore=("logger",)):
"""
Return a tuple containing dictionary of calling function's. named arguments and a list of
calling function's unnamed positional arguments.
"""
posname, kwname, argskwargs = getargvalues(stack()[1][0])[-3:]
argskwargs.update(argskwargs.pop(kwname, []))
argskwargs = {k: v for k, v in argskwargs.items() if k not in ignore and k != 'self' and not k.startswith('__')}
sig = signature(self.__init__)
argsnames = [k for k in sig.parameters if sig.parameters[k].default == _empty]
return {'args': {k: v for k, v in argskwargs.items() if k in argsnames},
'kwargs': {k: v for k, v in argskwargs.items() if k not in argsnames}}
def get_parameter(self, key_user_opts, val_json=None, attr_db_job_record='', fallback=None):
# 1. JobConfig parameters: parameters that are directly passed to JobConfig
if key_user_opts in self.kwargs and self.kwargs[key_user_opts] != self.kwargs_defaults[key_user_opts]:
......@@ -464,7 +379,7 @@ class JobConfig(object):
# type: () -> GMS_JOB
if not self._DB_job_record:
# check if job ID exists in database
from .misc.database_tools import GMS_JOB # noqa F811 # redefinition of unused 'GMS_JOB' from line 22
from ..misc.database_tools import GMS_JOB # noqa F811 # redefinition of unused 'GMS_JOB' from line 22
try:
self._DB_job_record = GMS_JOB(self.conn_database).from_job_ID(self.ID)
except ValueError:
......@@ -477,7 +392,7 @@ class JobConfig(object):
# type: () -> dict
"""Returns the content of the config table of the postgreSQL database as dictionary."""
if not self._DB_config_table:
from .misc.database_tools import get_info_from_postgreSQLdb
from ..misc.database_tools import get_info_from_postgreSQLdb
db_cfg = dict(get_info_from_postgreSQLdb(self.conn_database, 'config', ['key', 'value']))
# convert relative to absolute paths
......@@ -489,7 +404,7 @@ class JobConfig(object):
def get_virtual_sensor_specs(self):
# type: () -> dict
"""Returns the content of the virtual_sensors table of the postgreSQL database as dictionary."""
from .misc.database_tools import get_info_from_postgreSQLdb
from ..misc.database_tools import get_info_from_postgreSQLdb
# column spectral_characteristics_datasetid is not used later because its given by jobs.datasetid_spatial_ref
cols2read = ['name', 'projection_epsg', 'spatial_resolution', 'spectral_characteristics_datasetid',
......@@ -514,8 +429,7 @@ class JobConfig(object):
NOTE: Reads the default options from options_default.json and updates the values with those from database.
"""
# read options_default.json
default_options = get_options(os.path.join(os.path.dirname(pkgutil.get_loader("gms_preprocessing").path),
'options_default.json'), validation=validate)
default_options = get_options(path_options_default, validation=validate)
if 'json_config' in self.kwargs and self.kwargs['json_config']:
if self.kwargs['json_config'].startswith("{"):
......@@ -568,20 +482,6 @@ class JobConfig(object):
def joinP(*items):
return os.path.join(*items)
def get_init_argskwargs(self, ignore=("logger",)):
"""
Return a tuple containing dictionary of calling function's. named arguments and a list of
calling function's unnamed positional arguments.
"""
posname, kwname, argskwargs = getargvalues(stack()[1][0])[-3:]
argskwargs.update(argskwargs.pop(kwname, []))
argskwargs = {k: v for k, v in argskwargs.items() if k not in ignore and k != 'self' and not k.startswith('__')}
sig = signature(self.__init__)
argsnames = [k for k in sig.parameters if sig.parameters[k].default == _empty]
return {'args': {k: v for k, v in argskwargs.items() if k in argsnames},
'kwargs': {k: v for k, v in argskwargs.items() if k not in argsnames}}
def get_data_list_of_current_jobID(self):
"""
Get a list of datasets to be processed from database and return it together with some metadata.
......@@ -592,7 +492,7 @@ class JobConfig(object):
('entity_ID', 'LC81930242015036LGN00'), ('filename', 'LC81930242015036LGN00.tar.gz'),
('sensormode', 'M'), ('logger', None)]), ...]
"""
from .model.metadata import get_sensormode
from ..model.metadata import get_sensormode
data_list = []
with psycopg2.connect(self.conn_database) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
......@@ -658,191 +558,6 @@ class JobConfig(object):
self.data_list = data_list
return self.data_list
class Job(object):
def __init__(self, ID, exec_mode='Python', db_host='localhost', exec_L1AP=None, exec_L1BP=None,
exec_L1CP=None, exec_L2AP=None, exec_L2BP=None, exec_L2CP=None, CPUs=None,
allow_subMultiprocessing=True, disable_exception_handler=True, log_level='INFO',
tiling_block_size_XY=(2048, 2048), is_test=False, profiling=False, benchmark_global=False,
path_procdata_scenes=None, path_procdata_MGRS=None, path_archive=None):
"""Create a job configuration
:param ID: job ID of the job to be executed, e.g. 123456 (must be present in database)
:param exec_mode: 'Python': writes intermediate results to disk in order to save memory
'Flink': keeps intermediate results in memory in order to save IO time
:param db_host: host name of the server that runs the postgreSQL database
:param exec_L1AP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L1BP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L1CP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L2AP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L2BP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param exec_L2CP: list of 3 elements: [run processor, write output, delete output if not needed anymore]
:param CPUs: number of CPU cores to be used for processing (default: None -> use all available)
:param allow_subMultiprocessing:
allow multiprocessing within workers
:param disable_exception_handler:
enable/disable automatic handling of unexpected exceptions (default: True -> enabled)
:param log_level: the logging level to be used (choices: 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL';
default: 'INFO')
:param tiling_block_size_XY:
X/Y block size to be used for any tiling process (default: (2048,2048)
:param is_test: whether the current job represents a software test job (run by a test runner) or not
(default=False)
:param profiling: enable/disable code profiling (default: False)
:param benchmark_global:
enable/disable benchmark of the whole processing pipeline
:param path_procdata_scenes:
output path to store processed scenes
:param path_procdata_MGRS:
output path to store processed MGRS tiles
:param path_archive: input path where downloaded data are stored
"""
# private attributes
self._DB_config = None
self._DB_job_record = None
# args
self.ID = ID
self.exec_mode = exec_mode
assert exec_mode in ['Flink', 'Python']
self.db_host = db_host
assert isinstance(db_host, str), "'db_host must be a string! Got %s." % type(db_host)
# kwargs
# processor configuration: [run processor, write output, delete output if not needed anymore]
self.exec_L1AP = exec_L1AP or [1, 1, 1]
self.exec_L1BP = exec_L1BP or [1, 1, 1]
self.exec_L1CP = exec_L1CP or [1, 1, 1]
self.exec_L2AP = exec_L2AP or [1, 1, 1]
self.exec_L2BP = exec_L2BP or [1, 1, 0]
self.exec_L2CP = exec_L2CP or [1, 1, 0]
self.skip_coreg = False
self.validate_exec_configs()
self.CPUs = CPUs if CPUs is not None else multiprocessing.cpu_count()
self.allow_subMultiprocessing = allow_subMultiprocessing
self.disable_exception_handler = disable_exception_handler is False
self.log_level = log_level
self.tiling_block_size_XY = tiling_block_size_XY
self.is_test = is_test
self.profiling = profiling
self.benchmark_global = benchmark_global
# fixed attributes
# possible values: 'pending', 'running', 'canceled', 'failed', 'finished_with_warnings',
# 'finished_with_errors', 'finished'
self.status = 'pending'
self.start_time = datetime.datetime.now()
self.end_time = None
self.computation_time = None
self.hostname = socket.gethostname()
# set all the default pathes
# TODO: HOW TO DEAL WITH THESE ATTRIBUTES IN TEST MODE?
self.path_spatIdxSrv = self.DB_config['path_spatial_index_mediator_server']
self.path_tempdir = self.DB_config['path_tempdir']
self.path_ac_tables = self.DB_config['path_ac_tables']
self.path_SNR_models = self.DB_config['path_SNR_models']
self.path_dem_proc_srtm_90m = self.DB_config['path_dem_proc_srtm_90m']
if not self.is_test:
self.path_fileserver = self.DB_config['path_data_root']
self.path_archive = self.joinP(self.path_fileserver, self.DB_config['foldername_download'])
self.path_procdata_scenes = self.joinP(self.path_fileserver, self.DB_config['foldername_procdata_scenes'])
self.path_procdata_MGRS = self.joinP(self.path_fileserver, self.DB_config['foldername_procdata_MGRS'])
self.path_earthSunDist = self.DB_config['path_earthSunDist']
self.path_SRFs = self.DB_config['path_SRFs']
self.path_cloud_classif = self.DB_config['path_cloud_classif']
self.path_solar_irr = self.DB_config['path_solar_irr']
self.path_ECMWF_db = self.DB_config['path_ECMWF_db']
self.path_benchmarks = self.DB_config['path_benchmarks']
self.path_job_logs = self.DB_config['path_job_logs']
else:
# in test mode, the repository should be self-contained -> use only relative paths
self.path_fileserver = self.absP('../tests/data/')
self.path_archive = self.absP('../tests/data/archive_data/')
self.path_procdata_scenes = self.absP('../tests/data/output_scenes/')
self.path_procdata_MGRS = self.absP('../tests/data/output_mgrs_tiles/')
self.path_earthSunDist = self.absP('./database/earth_sun_distance/Earth_Sun_distances_per_day_edited.csv', )
self.path_SRFs = self.absP('./database/srf/')
self.path_cloud_classif = self.absP('./database/cloud_classifier/')
self.path_solar_irr = self.absP(
'./database/solar_irradiance/SUNp1fontenla__350-2500nm_@0.1nm_converted.txt')
self.path_ECMWF_db = self.absP('../tests/data/processed_ECMWF/')
self.path_benchmarks = self.absP('./benchmarks/')
self.path_job_logs = self.absP('./logs/job_logs/')
# overwrite defaults with user provided keyword arguments
kwargs = self.get_init_argskwargs()['kwargs']
defaults = self.get_init_kwdefaults()
for kwName, kwVal in kwargs.items():
if not hasattr(self, kwName):
from .misc.exceptions import GMSConfigParameterError
raise GMSConfigParameterError("'%s' is not a valid parameter for config.Job." % kwName)
else:
if kwVal != defaults[kwName]:
setattr(self, kwName, kwVal)
# create missing output directories
if not os.path.isdir(self.path_job_logs):
os.makedirs(self.path_job_logs)
@property
def conn_database(self):
return "dbname='geomultisens' user='gmsdb' password='gmsdb' host='%s' connect_timeout=3" % self.db_host
def get_init_argskwargs(self, ignore=("logger",)):
"""
Return a tuple containing dictionary of calling function's. named arguments and a list of
calling function's unnamed positional arguments.
"""
posname, kwname, argskwargs = getargvalues(stack()[1][0])[-3:]
argskwargs.update(argskwargs.pop(kwname, []))
argskwargs = {k: v for k, v in argskwargs.items() if k not in ignore and k != 'self' and not k.startswith('__')}
sig = signature(self.__init__)
argsnames = [k for k in sig.parameters if sig.parameters[k].default == _empty]
return {'args': {k: v for k, v in argskwargs.items() if k in argsnames},
'kwargs': {k: v for k, v in argskwargs.items() if k not in argsnames}}
def get_init_kwdefaults(self):
a = getfullargspec(self.__init__)
return dict(zip(a.args[-len(a.defaults):], a.defaults))
@staticmethod
def absP(relP):
return os.path.abspath(os.path.join(os.path.dirname(__file__), relP))
@staticmethod
def joinP(*items):
return os.path.join(*items)
@property
def DB_config(self):
if not self._DB_config:
from .misc.database_tools import get_info_from_postgreSQLdb
db_cfg = dict(get_info_from_postgreSQLdb(self.conn_database, 'config', ['key', 'value']))
# convert relative to absolute paths
self._DB_config = {k: self.absP(v) if k.startswith('path_') and v.startswith('./') else v
for k, v in db_cfg.items()}
return self._DB_config
@property
def DB_job_record(self):
if not self._DB_job_record:
# check if job ID exists in database
from .misc.database_tools import GMS_JOB # noqa F811 # redefinition of unused 'GMS_JOB' from line 22
try:
self._DB_job_record = GMS_JOB(self.conn_database).from_job_ID(self.ID)
except ValueError:
raise
return self._DB_job_record
def validate_exec_configs(self):
for i in ['L1AP', 'L1BP', 'L1CP', 'L2AP', 'L2BP', 'L2CP']:
exec_lvl = getattr(self, 'exec_%s' % i)
......@@ -860,148 +575,53 @@ class Job(object):
"Turning it on.." % i)
exec_lvl[1] = 1
def to_dict(self):
opts_default = get_options(path_options_default)
class Usecase:
def __init__(self, _job):
self._job = _job