pipeline.py 17.1 KB
Newer Older
1
2
# -*- coding: utf-8 -*-

3
4
# gms_preprocessing, spatial and spectral homogenization of satellite remote sensing data
#
5
# Copyright (C) 2020  Daniel Scheffler (GFZ Potsdam, daniel.scheffler@gfz-potsdam.de)
6
7
8
9
10
11
#
# This software was developed within the context of the GeoMultiSens project funded
# by the German Federal Ministry of Education and Research
# (project grant code: 01 IS 14 010 A-C).
#
# This program is free software: you can redistribute it and/or modify it under
12
# the terms of the GNU General Public License as published by the Free Software
13
14
# Foundation, either version 3 of the License, or (at your option) any later version.
# Please note the following exception: `gms_preprocessing` depends on tqdm, which
15
16
17
# is distributed under the Mozilla Public Licence (MPL) v2.0 except for the files
# "tqdm/_tqdm.py", "setup.py", "README.rst", "MANIFEST.in" and ".gitignore".
# Details can be found here: https://github.com/tqdm/tqdm/blob/master/LICENCE.
18
19
20
21
22
23
24
25
26
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

Daniel Scheffler's avatar
Daniel Scheffler committed
27
from typing import List, Tuple, Generator, Iterable, Union  # noqa F401  # flake8 issue
28
from psutil import virtual_memory
29

30
from ..options.config import GMS_config as CFG
31
from ..misc import exception_handler as EXC_H
Daniel Scheffler's avatar
Daniel Scheffler committed
32
from ..misc.path_generator import path_generator
33
from ..misc.logging import GMS_logger
34
from ..misc.locks import ProcessLock, MemoryReserver, redis_conn
35
36
37
38
39
40
from ..algorithms import L1A_P
from ..algorithms import L1B_P
from ..algorithms import L1C_P
from ..algorithms import L2A_P
from ..algorithms import L2B_P
from ..algorithms import L2C_P
41
from ..model.gms_object import \
42
    failed_GMS_object, update_proc_status, return_proc_reports_only, estimate_mem_usage
Daniel Scheffler's avatar
Daniel Scheffler committed
43
from ..model.gms_object import GMS_object  # noqa F401  # flake8 issue
44
from ..algorithms.geoprocessing import get_common_extent
45

46
47
__author__ = 'Daniel Scheffler'

48
49

@EXC_H.log_uncaught_exceptions
50
@update_proc_status
51
def L1A_map(dataset_dict):  # map (scene-wise parallelization)
52
53
    # type: (dict) -> L1A_P.L1A_object

54
    L1A_obj = L1A_P.L1A_object(**dataset_dict)
55
    L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
56
    L1A_obj.import_rasterdata()
57
    L1A_obj.import_metadata()
58
    L1A_obj.validate_GeoTransProj_GeoAlign()  # sets self.GeoTransProj_ok and self.GeoAlign_ok
59
    L1A_obj.apply_nodata_mask_to_ObjAttr('arr')  # nodata mask is automatically calculated
60
61
62
    L1A_obj.add_rasterInfo_to_MetaObj()
    L1A_obj.reference_data('UTM')
    L1A_obj.calc_TOARadRefTemp()
63
64
    L1A_obj.calc_corner_positions()  # requires mask_nodata
    L1A_obj.calc_center_AcqTime()  # (if neccessary); requires corner positions
65
    L1A_obj.calc_mean_VAA()
66
67
    L1A_obj.calc_orbit_overpassParams()  # requires corner positions
    L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds', 0)
68

69
    if CFG.exec_L1AP[1]:
70
        L1A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
71
    L1A_obj.record_mem_usage()
72
73
    L1A_obj.delete_tempFiles()

74
75
    return L1A_obj

76

77
@EXC_H.log_uncaught_exceptions
78
@update_proc_status
79
def L1A_map_1(dataset_dict, block_size=None):  # map (scene-wise parallelization)
80
    # type: (dict, tuple) -> List[L1A_P.L1A_object]
81

82
    L1A_obj = L1A_P.L1A_object(**dataset_dict)
83
    L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
84
    L1A_obj.import_rasterdata()
85
    L1A_obj.import_metadata()
86
    L1A_obj.validate_GeoTransProj_GeoAlign()  # sets self.GeoTransProj_ok and self.GeoAlign_ok
87
    L1A_obj.apply_nodata_mask_to_ObjAttr('arr')  # nodata mask is automatically calculated
88
89
    L1A_obj.add_rasterInfo_to_MetaObj()
    L1A_obj.reference_data('UTM')
90
91
92
93
    tiles = [tile for tile in
             # cut (block-wise parallelization)
             L1A_obj.to_tiles(blocksize=block_size if block_size else CFG.tiling_block_size_XY)
             if tile is not None]    # None is returned in case the tile contains no data at all
94
95
    return tiles

96

97
@EXC_H.log_uncaught_exceptions
98
@update_proc_status
99
def L1A_map_2(L1A_tile):  # map (block-wise parallelization)
100
101
    # type: (L1A_P.L1A_object) -> L1A_P.L1A_object
    L1A_tile.calc_TOARadRefTemp()
102
    if not CFG.inmem_serialization:
103
        L1A_tile.to_ENVI(CFG.write_ENVIclassif_cloudmask, is_tempfile=True)
104
105
    return L1A_tile

106

107
@EXC_H.log_uncaught_exceptions
108
@update_proc_status
109
def L1A_map_3(L1A_obj):  # map (scene-wise parallelization)
110
    # type: (L1A_P.L1A_object) -> L1A_P.L1A_object
111
112
    L1A_obj.calc_corner_positions()  # requires mask_nodata
    L1A_obj.calc_center_AcqTime()  # (if neccessary); requires corner positions
113
    L1A_obj.calc_mean_VAA()
114
115
    L1A_obj.calc_orbit_overpassParams()  # requires corner positions
    L1A_obj.apply_nodata_mask_to_ObjAttr('mask_clouds', 0)
116
    if CFG.exec_L1AP[1]:
117
        L1A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
118
119
120
        L1A_obj.delete_tempFiles()
    else:
        L1A_obj.delete_tempFiles()
121
    L1A_obj.record_mem_usage()
122
123
    return L1A_obj

124

125
@EXC_H.log_uncaught_exceptions
126
@update_proc_status
127
def L1B_map(L1A_obj):
128
    # type: (L1A_P.L1A_object) -> L1B_P.L1B_object
129
    """L1A_obj enthält in Python- (im Gegensatz zur inmem_serialization-) Implementierung KEINE ARRAY-DATEN!,
130
131
    nur die für die ganze Szene gültigen Metadaten"""

132
    L1A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
133

134
    L1B_obj = L1B_P.L1B_object(L1A_obj)
135
    L1B_obj.compute_global_shifts()
136

137
    if CFG.exec_L1BP[1]:
138
        L1B_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
139
    L1B_obj.record_mem_usage()
140
141
142
    L1B_obj.delete_tempFiles()
    return L1B_obj

143

144
@EXC_H.log_uncaught_exceptions
145
@update_proc_status
146
def L1C_map(L1B_objs):
Daniel Scheffler's avatar
Daniel Scheffler committed
147
    # type: (Iterable[L1B_P.L1B_object]) -> List[L1C_P.L1C_object]
148
149
    """Atmospheric correction.

150
151
152
153
    NOTE: all subsystems (containing all spatial samplings) belonging to the same scene ID are needed

    :param L1B_objs: list containing one or multiple L1B objects belonging to the same scene ID.
    """
154
    list(L1B_objs)[0].block_at_system_overload(max_usage=CFG.critical_mem_usage)
155

156
157
158
159
    # initialize L1C objects
    L1C_objs = [L1C_P.L1C_object(L1B_obj) for L1B_obj in L1B_objs]

    # check in config if atmospheric correction is desired
160
    if CFG.target_radunit_optical == 'BOA_Ref':
161
162
163
        # atmospheric correction (asserts that there is an ac_options.json file on disk for the current sensor)
        if L1C_objs[0].ac_options:
            # perform atmospheric correction
164
            L1C_objs = L1C_P.AtmCorr(*L1C_objs).run_atmospheric_correction(dump_ac_input=False)
165
166
        else:
            [L1C_obj.logger.warning('Atmospheric correction is not yet supported for %s %s and has been skipped.'
167
                                    % (L1C_obj.satellite, L1C_obj.sensor)) for L1C_obj in L1C_objs]
168
169
    else:
        [L1C_obj.logger.warning('Atmospheric correction skipped because optical conversion type is set to %s.'
170
                                % CFG.target_radunit_optical) for L1C_obj in L1C_objs]
171
172

    # write outputs and delete temporary data
Daniel Scheffler's avatar
Daniel Scheffler committed
173
    for L1C_obj in L1C_objs:
174
        if CFG.exec_L1CP[1]:
175
            L1C_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
176
        if L1C_obj.arr_shape == 'cube':
177
            L1C_obj.delete_tempFiles()
178
        L1C_obj.delete_ac_input_arrays()
179

180
    [L1C_obj.record_mem_usage() for L1C_obj in L1C_objs]
181
182
183
184
    return L1C_objs


@EXC_H.log_uncaught_exceptions
185
@update_proc_status
Daniel Scheffler's avatar
Daniel Scheffler committed
186
def L2A_map(L1C_objs, block_size=None, return_tiles=True):
187
    # type: (Union[List[L1C_P.L1C_object], Tuple[L1C_P.L1C_object]], tuple, bool) -> Union[List[L2A_P.L2A_object], L2A_P.L2A_object]  # noqa
188
189
190
191
192
193
    """Geometric homogenization.

    Performs correction of geometric displacements, resampling to target grid of the usecase and merges multiple
    GMS objects belonging to the same scene ID (subsystems) to a single L2A object.
    NOTE: Output L2A_object must be cut into tiles because L2A_obj size in memory exceeds maximum serialization size.

Daniel Scheffler's avatar
Daniel Scheffler committed
194
195
196
197
    :param L1C_objs:        list containing one or multiple L1C objects belonging to the same scene ID.
    :param block_size:      X/Y size of output tiles in pixels, e.g. (1024,1024)
    :param return_tiles:    return computed L2A object in tiles
    :return:                list of L2A_object tiles
198
    """
199
    L1C_objs[0].block_at_system_overload(max_usage=CFG.critical_mem_usage)
200

201
    # initialize L2A objects
202
203
    L2A_objs = [L2A_P.L2A_object(L1C_obj) for L1C_obj in L1C_objs]

204
205
206
207
208
209
    # get common extent (NOTE: using lon/lat coordinates here would cause black borders around the UTM image
    #                          because get_common_extent() just uses the envelop without regard to the projection
    clip_extent = \
        get_common_extent([obj.trueDataCornerUTM for obj in L2A_objs]) \
        if len(L2A_objs) > 1 else L2A_objs[0].trueDataCornerUTM

210
    # correct geometric displacements and resample to target grid
211
    for L2A_obj in L2A_objs:
212
213
        L2A_obj.correct_spatial_shifts(cliptoextent=CFG.clip_to_extent,
                                       clipextent=clip_extent, clipextent_prj=L2A_obj.arr.prj)
214
215

    # merge multiple subsystems belonging to the same scene ID to a single GMS object
216
217
218
219
220
221
222
223
    if len(L2A_objs) > 1:
        L2A_obj = L2A_P.L2A_object.from_sensor_subsystems(L2A_objs)
    else:
        L2A_obj = L2A_objs[0]

        # update attributes
        L2A_obj.calc_mask_nodata(overwrite=True)
        L2A_obj.calc_corner_positions()
224
225

    # write output
226
    if CFG.exec_L2AP[1]:
227
        L2A_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
228
229
230
231

    # delete tempfiles of separate subsystem GMS objects
    [L2A_obj.delete_tempFiles() for L2A_obj in L2A_objs]

Daniel Scheffler's avatar
Daniel Scheffler committed
232
    if return_tiles:
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
233
        L2A_tiles = list(L2A_obj.to_tiles(blocksize=block_size if block_size else CFG.tiling_block_size_XY))
234
        L2A_tiles = [i for i in L2A_tiles if i is not None]  # None is returned in case the tile contains no data at all
235
        [L2A_tile.record_mem_usage() for L2A_tile in L2A_tiles]
Daniel Scheffler's avatar
Bugfix.    
Daniel Scheffler committed
236
        return L2A_tiles
Daniel Scheffler's avatar
Daniel Scheffler committed
237
    else:
238
        L2A_obj.record_mem_usage()
Daniel Scheffler's avatar
Daniel Scheffler committed
239
        return L2A_obj
240
241


242
@EXC_H.log_uncaught_exceptions
243
@update_proc_status
244
def L2B_map(L2A_obj):
245
    # type: (L2A_P.L2A_object) -> L2B_P.L2B_object
246
    L2A_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
247
248
    L2B_obj = L2B_P.L2B_object(L2A_obj)
    L2B_obj.spectral_homogenization()
249
    if CFG.exec_L2BP[1]:
250
        L2B_obj.to_ENVI(CFG.write_ENVIclassif_cloudmask)
251
    if L2B_obj.arr_shape == 'cube':
252
        L2B_obj.delete_tempFiles()
253
    L2B_obj.record_mem_usage()
254
255
    return L2B_obj

256

257
@EXC_H.log_uncaught_exceptions
258
@update_proc_status
259
def L2C_map(L2B_obj):
260
    # type: (L2B_P.L2B_object) -> L2C_P.L2C_object
261
    L2B_obj.block_at_system_overload(max_usage=CFG.critical_mem_usage)
262
    L2C_obj = L2C_P.L2C_object(L2B_obj)
263
    if CFG.exec_L2CP[1]:
264
        L2C_MRGS_tiles = L2C_obj.to_MGRS_tiles(pixbuffer=CFG.mgrs_pixel_buffer)
265
266
        [MGRS_tile.to_ENVI(CFG.write_ENVIclassif_cloudmask,
                           compression=CFG.output_data_compression) for MGRS_tile in L2C_MRGS_tiles]
267
    L2C_obj.record_mem_usage()
268
    L2C_obj.delete_tempFiles()
269
    return L2C_obj  # contains no array data in Python mode
Daniel Scheffler's avatar
Daniel Scheffler committed
270
271


272
@return_proc_reports_only
273
# @return_GMS_objs_without_arrays
Daniel Scheffler's avatar
Daniel Scheffler committed
274
275
def run_complete_preprocessing(list_dataset_dicts_per_scene):  # map (scene-wise parallelization)
    # type: (List[dict]) -> Union[L1A_P.GMS_object, List, Generator]
276
277
278
279
280
281
282
    """

    NOTE: Exceptions in this function are must be catched by calling function (process controller).

    :param list_dataset_dicts_per_scene:
    :return:
    """
283
284
    with GMS_logger('log__%s' % CFG.ID, fmt_suffix=list_dataset_dicts_per_scene[0]['scene_ID'],
                    log_level=CFG.log_level, append=True) as pipeline_logger:
285

286
287
288
        # set CPU and memory limits
        cpulimit = CFG.CPUs_all_jobs
        mem2reserve = 15
289

290
291
292
293
294
295
296
297
298
299
        if redis_conn:
            mem_estim = estimate_mem_usage(list_dataset_dicts_per_scene[0]['dataset_ID'],
                                           list_dataset_dicts_per_scene[0]['satellite'])
            if mem_estim:
                mem2reserve = mem_estim
            else:
                cpulimit = int((virtual_memory().total * .8 - virtual_memory().used) / 1024 ** 3 / mem2reserve)
                pipeline_logger.info('No memory usage statistics from earlier jobs found for the current '
                                     'configuration. Limiting processes to %s in order to collect memory statistics '
                                     'first.' % cpulimit)
Daniel Scheffler's avatar
Daniel Scheffler committed
300

301
302
303
        # start processing
        with MemoryReserver(mem2lock_gb=mem2reserve, logger=pipeline_logger, max_usage=CFG.max_mem_usage),\
                ProcessLock(allowed_slots=cpulimit, logger=pipeline_logger):
Daniel Scheffler's avatar
Daniel Scheffler committed
304

305
306
307
            if len(list(set([ds['proc_level'] for ds in list_dataset_dicts_per_scene]))) != 1:
                raise ValueError('Lists of subsystem datasets with different processing levels are not supported here. '
                                 'Received %s.' % list_dataset_dicts_per_scene)
Daniel Scheffler's avatar
Daniel Scheffler committed
308

309
            input_proc_level = list_dataset_dicts_per_scene[0]['proc_level']
Daniel Scheffler's avatar
Daniel Scheffler committed
310

311
312
313
            ##################
            # L1A processing #
            ##################
Daniel Scheffler's avatar
Daniel Scheffler committed
314

315
316
317
318
            L1A_objects = []
            if CFG.exec_L1AP[0] and input_proc_level is None:
                L1A_objects = \
                    [L1A_map(subsystem_dataset_dict) for subsystem_dataset_dict in list_dataset_dicts_per_scene]
Daniel Scheffler's avatar
Daniel Scheffler committed
319

320
321
                if any([isinstance(obj, failed_GMS_object) for obj in L1A_objects]):
                    return L1A_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
322

323
324
325
            ##################
            # L1B processing #
            ##################
Daniel Scheffler's avatar
Daniel Scheffler committed
326

327
328
329
            # select subsystem with optimal band for co-registration
            # L1B_obj_coreg = L1B_map(L1A_objects[0])
            # copy coreg information to remaining subsets
330

331
332
333
334
335
336
337
            L1B_objects = L1A_objects
            if CFG.exec_L1BP[0]:
                # add earlier processed L1A data
                if input_proc_level == 'L1A':
                    for ds in list_dataset_dicts_per_scene:
                        GMSfile = path_generator(ds, proc_level='L1A').get_path_gmsfile()
                        L1A_objects.append(L1A_P.L1A_object.from_disk([GMSfile, ['cube', None]]))
Daniel Scheffler's avatar
Daniel Scheffler committed
338

339
                L1B_objects = [L1B_map(L1A_obj) for L1A_obj in L1A_objects]
Daniel Scheffler's avatar
Daniel Scheffler committed
340

341
                del L1A_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
342

343
344
                if any([isinstance(obj, failed_GMS_object) for obj in L1B_objects]):
                    return L1B_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
345

346
347
348
            ##################
            # L1C processing #
            ##################
349

350
351
352
353
354
355
356
            L1C_objects = L1B_objects
            if CFG.exec_L1CP[0]:
                # add earlier processed L1B data
                if input_proc_level == 'L1B':
                    for ds in list_dataset_dicts_per_scene:
                        GMSfile = path_generator(ds, proc_level='L1B').get_path_gmsfile()
                        L1B_objects.append(L1B_P.L1B_object.from_disk([GMSfile, ['cube', None]]))
Daniel Scheffler's avatar
Daniel Scheffler committed
357

358
                L1C_objects = L1C_map(L1B_objects)
Daniel Scheffler's avatar
Daniel Scheffler committed
359

360
                del L1B_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
361

362
363
                if any([isinstance(obj, failed_GMS_object) for obj in L1C_objects]):
                    return L1C_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
364

365
366
367
368
369
370
371
372
373
374
375
376
            if not CFG.exec_L2AP[0]:
                return L1C_objects

            ##################
            # L2A processing #
            ##################

            # add earlier processed L1C data
            if input_proc_level == 'L1C':
                for ds in list_dataset_dicts_per_scene:
                    GMSfile = path_generator(ds, proc_level='L1C').get_path_gmsfile()
                    L1C_objects.append(L1C_P.L1C_object.from_disk([GMSfile, ['cube', None]]))
Daniel Scheffler's avatar
Daniel Scheffler committed
377

378
            L2A_obj = L2A_map(L1C_objects, return_tiles=False)
379

380
            del L1C_objects
Daniel Scheffler's avatar
Daniel Scheffler committed
381

382
383
            if isinstance(L2A_obj, failed_GMS_object) or not CFG.exec_L2BP[0]:
                return L2A_obj
Daniel Scheffler's avatar
Daniel Scheffler committed
384

385
386
387
            ##################
            # L2B processing #
            ##################
Daniel Scheffler's avatar
Daniel Scheffler committed
388

389
390
391
392
393
394
            # add earlier processed L2A data
            if input_proc_level == 'L2A':
                assert len(list_dataset_dicts_per_scene) == 1, \
                    'Expected only a single L2A dataset since subsystems are merged.'
                GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2A').get_path_gmsfile()
                L2A_obj = L2A_P.L2A_object.from_disk([GMSfile, ['cube', None]])
Daniel Scheffler's avatar
Daniel Scheffler committed
395

396
            L2B_obj = L2B_map(L2A_obj)
397

398
            del L2A_obj
Daniel Scheffler's avatar
Daniel Scheffler committed
399

400
401
            if isinstance(L2B_obj, failed_GMS_object) or not CFG.exec_L2CP[0]:
                return L2B_obj
Daniel Scheffler's avatar
Daniel Scheffler committed
402

403
404
405
            ##################
            # L2C processing #
            ##################
Daniel Scheffler's avatar
Daniel Scheffler committed
406

407
408
409
410
411
412
            # add earlier processed L2B data
            if input_proc_level == 'L2B':
                assert len(list_dataset_dicts_per_scene) == 1, \
                    'Expected only a single L2B dataset since subsystems are merged.'
                GMSfile = path_generator(list_dataset_dicts_per_scene[0], proc_level='L2B').get_path_gmsfile()
                L2B_obj = L2B_P.L2B_object.from_disk([GMSfile, ['cube', None]])
Daniel Scheffler's avatar
Daniel Scheffler committed
413

414
            L2C_obj = L2C_map(L2B_obj)  # type: Union[GMS_object, failed_GMS_object, List]
415

416
            del L2B_obj
Daniel Scheffler's avatar
Daniel Scheffler committed
417

418
            return L2C_obj