# -*- coding: utf-8 -*- # sensormapgeo, Transform remote sensing images between sensor and map geometry. # # Copyright (C) 2020 Daniel Scheffler (GFZ Potsdam, danschef@gfz-potsdam.de) # # This software was developed within the context of the EnMAP project supported # by the DLR Space Administration with funds of the German Federal Ministry of # Economic Affairs and Energy (on the basis of a decision by the German Bundestag: # 50 EE 1529) and contributions from DLR, GFZ and OHB System AG. # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Lesser General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License along # with this program. If not, see . """Module to transform 3D arrays between sensor and map geometry (using band-wise Lon/Lat arrays).""" from typing import Union, Tuple import multiprocessing import numpy as np from py_tools_ds.geo.projection import WKT2EPSG, proj4_to_WKT from py_tools_ds.geo.coord_trafo import get_proj4info from .transformer_2d import SensorMapGeometryTransformer, _corner_coords_lonlat_to_extent class SensorMapGeometryTransformer3D(object): def __init__(self, lons: np.ndarray, lats: np.ndarray, resamp_alg: str = 'nearest', radius_of_influence: int = 30, mp_alg: str = 'auto', **opts) -> None: """Get an instance of SensorMapGeometryTransformer. :param lons: 3D longitude array corresponding to the 3D sensor geometry array :param lats: 3D latitude array corresponding to the 3D sensor geometry array :Keyword Arguments: (further documentation here: https://pyresample.readthedocs.io/en/latest/swath.html) - resamp_alg: resampling algorithm ('nearest', 'bilinear', 'gauss', 'custom') - radius_of_influence: Cut off distance in meters (default: 30) NOTE: keyword is named 'radius' in case of bilinear resampling - mp_alg multiprocessing algorithm 'bands': parallelize over bands using multiprocessing lib 'tiles': parallelize over tiles using OpenMP 'auto': automatically choose the algorithm - sigmas: [ONLY 'gauss'] List of sigmas to use for the gauss weighting of each channel 1 to k, w_k = exp(-dist^2/sigma_k^2). If only one channel is resampled sigmas is a single float value. - neighbours: [ONLY 'bilinear', 'gauss'] Number of neighbours to consider for each grid point when searching the closest corner points - epsilon: Allowed uncertainty in meters. Increasing uncertainty reduces execution time - weight_funcs: [ONLY 'custom'] List of weight functions f(dist) to use for the weighting of each channel 1 to k. If only one channel is resampled weight_funcs is a single function object. - fill_value: Set undetermined pixels to this value (default: 0). If fill_value is None a masked array is returned with undetermined pixels masked - reduce_data: Perform initial coarse reduction of source dataset in order to reduce execution time - nprocs: , Number of processor cores to be used - segments: Number of segments to use when resampling. If set to None an estimate will be calculated - with_uncert: [ONLY 'gauss' and 'custom'] Calculate uncertainty estimates NOTE: resampling function has 3 return values instead of 1: result, stddev, count """ # validation if lons.ndim != 3: raise ValueError('Expected a 3D longitude array. Received a %dD array.' % lons.ndim) if lats.ndim != 3: raise ValueError('Expected a 3D latitude array. Received a %dD array.' % lats.ndim) if lons.shape != lats.shape: raise ValueError((lons.shape, lats.shape), "'lons' and 'lats' are expected to have the same shape.") self.lats = lats self.lons = lons self.resamp_alg = resamp_alg self.radius_of_influence = radius_of_influence self.opts = opts # define number of CPUs to use (but avoid sub-multiprocessing) # -> parallelize either over bands or over image tiles # bands: multiprocessing uses multiprocessing.Pool, implemented in to_map_geometry / to_sensor_geometry # tiles: multiprocessing uses OpenMP implemented in pykdtree which is used by pyresample self.opts['nprocs'] = opts.get('nprocs', multiprocessing.cpu_count()) self.mp_alg = ('bands' if self.lons.shape[2] >= opts['nprocs'] else 'tiles') if mp_alg == 'auto' else mp_alg @staticmethod def _to_map_geometry_2D(kwargs_dict: dict) -> Tuple[np.ndarray, tuple, str, int]: assert [var is not None for var in (_global_shared_lons, _global_shared_lats, _global_shared_data)] SMGT2D = SensorMapGeometryTransformer(lons=_global_shared_lons[:, :, kwargs_dict['band_idx']], lats=_global_shared_lats[:, :, kwargs_dict['band_idx']], resamp_alg=kwargs_dict['resamp_alg'], radius_of_influence=kwargs_dict['radius_of_influence'], **kwargs_dict['init_opts']) data_mapgeo, out_gt, out_prj = SMGT2D.to_map_geometry(data=_global_shared_data[:, :, kwargs_dict['band_idx']], tgt_prj=kwargs_dict['tgt_prj'], tgt_extent=kwargs_dict['tgt_extent'], tgt_res=kwargs_dict['tgt_res']) return data_mapgeo, out_gt, out_prj, kwargs_dict['band_idx'] def _get_common_target_extent(self, tgt_epsg): corner_coords_ll = [[self.lons[0, 0, :].min(), self.lats[0, 0, :].max()], # common UL_xy [self.lons[0, -1, :].max(), self.lats[0, -1, :].max()], # common UR_xy [self.lons[-1, 0, :].min(), self.lats[-1, 0, :].min()], # common LL_xy [self.lons[-1, -1, :].max(), self.lats[-1, -1, :].min()], # common LR_xy ] tgt_extent = _corner_coords_lonlat_to_extent(corner_coords_ll, tgt_epsg) return tgt_extent def to_map_geometry(self, data: np.ndarray, tgt_prj: Union[str, int], tgt_extent: Tuple[float, float, float, float] = None, tgt_res: Tuple = None) -> Tuple[np.ndarray, tuple, str]: """Transform the input sensor geometry array into map geometry. :param data: 3D numpy array (representing sensor geometry) to be warped to map geometry :param tgt_prj: target projection (WKT or 'epsg:1234' or ) :param tgt_extent: extent coordinates of output map geometry array (LL_x, LL_y, UR_x, UR_y) in the tgt_prj :param tgt_res: target X/Y resolution (e.g., (30, 30)) """ if data.ndim != 3: raise ValueError(data.ndim, "'data' must have 3 dimensions.") if data.shape != self.lons.shape: raise ValueError(data.shape, 'Expected a sensor geometry data array with %d rows, %d columns and %d bands.' % self.lons.shape) # get common target extent tgt_epsg = WKT2EPSG(proj4_to_WKT(get_proj4info(proj=tgt_prj))) tgt_extent = tgt_extent or self._get_common_target_extent(tgt_epsg) init_opts = self.opts.copy() if self.mp_alg == 'bands': del init_opts['nprocs'] # avoid sub-multiprocessing args = [dict( resamp_alg=self.resamp_alg, radius_of_influence=self.radius_of_influence, init_opts=init_opts, tgt_prj=tgt_prj, tgt_extent=tgt_extent, tgt_res=tgt_res, band_idx=band ) for band in range(data.shape[2])] if self.opts['nprocs'] > 1 and self.mp_alg == 'bands': with multiprocessing.Pool(self.opts['nprocs'], initializer=_initializer, initargs=(self.lats, self.lons, data)) as pool: result = pool.map(self._to_map_geometry_2D, args) else: _initializer(self.lats, self.lons, data) result = [self._to_map_geometry_2D(argsdict) for argsdict in args] band_inds = list(np.array(result)[:, -1]) data_mapgeo = np.dstack([result[band_inds.index(i)][0] for i in range(data.shape[2])]) out_gt = result[0][1] out_prj = result[0][2] return data_mapgeo, out_gt, out_prj @staticmethod def _to_sensor_geometry_2D(kwargs_dict: dict) -> (np.ndarray, int): assert [var is not None for var in (_global_shared_lons, _global_shared_lats, _global_shared_data)] SMGT2D = SensorMapGeometryTransformer(lons=_global_shared_lons[:, :, kwargs_dict['band_idx']], lats=_global_shared_lats[:, :, kwargs_dict['band_idx']], resamp_alg=kwargs_dict['resamp_alg'], radius_of_influence=kwargs_dict['radius_of_influence'], **kwargs_dict['init_opts']) data_sensorgeo = SMGT2D.to_sensor_geometry(data=_global_shared_data[:, :, kwargs_dict['band_idx']], src_prj=kwargs_dict['src_prj'], src_extent=kwargs_dict['src_extent']) return data_sensorgeo, kwargs_dict['band_idx'] def to_sensor_geometry(self, data: np.ndarray, src_prj: Union[str, int], src_extent: Tuple[float, float, float, float]) -> np.ndarray: """Transform the input map geometry array into sensor geometry :param data: 3D numpy array (representing map geometry) to be warped to sensor geometry :param src_prj: projection of the input map geometry array (WKT or 'epsg:1234' or ) :param src_extent: extent coordinates of input map geometry array (LL_x, LL_y, UR_x, UR_y) in the src_prj """ if data.ndim != 3: raise ValueError(data.ndim, "'data' must have 3 dimensions.") init_opts = self.opts.copy() if self.mp_alg == 'bands': del init_opts['nprocs'] # avoid sub-multiprocessing args = [dict( resamp_alg=self.resamp_alg, radius_of_influence=self.radius_of_influence, init_opts=init_opts, src_prj=src_prj, src_extent=src_extent, band_idx=band ) for band in range(data.shape[2])] if self.opts['nprocs'] > 1 and self.mp_alg == 'bands': with multiprocessing.Pool(self.opts['nprocs'], initializer=_initializer, initargs=(self.lats, self.lons, data)) as pool: result = pool.map(self._to_sensor_geometry_2D, args) else: _initializer(self.lats, self.lons, data) result = [self._to_sensor_geometry_2D(argsdict) for argsdict in args] band_inds = list(np.array(result)[:, -1]) data_sensorgeo = np.dstack([result[band_inds.index(i)][0] for i in range(data.shape[2])]) return data_sensorgeo _global_shared_lats = None _global_shared_lons = None _global_shared_data = None def _initializer(lats, lons, data): """Declare global variables needed for SensorMapGeometryTransformer3D.to_map_geometry and to_sensor_geometry. :param lats: :param lons: :param data: """ global _global_shared_lats, _global_shared_lons, _global_shared_data _global_shared_lats = lats _global_shared_lons = lons _global_shared_data = data