Commit 3d4c735c authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Revised SensorMapGeometryTransformer + tests.

parent 7503cec9
Pipeline #3407 passed with stages
in 1 minute and 38 seconds
......@@ -5,7 +5,7 @@ import warnings
import multiprocessing
import os
from tempfile import TemporaryDirectory
from typing import Union, Tuple, List, Any, Iterable # noqa: F401
from typing import Union, Tuple, List, Any # noqa: F401
# custom
try:
......@@ -307,7 +307,8 @@ def warp_ndarray(ndarray, in_gt, in_prj=None, out_prj=None, out_dtype=None,
if not isinstance(ndarray, (list, tuple)):
assert str(np.dtype(ndarray.dtype)) in dTypeDic_NumPy2GDAL, "Unknown target datatype '%s'." % ndarray.dtype
else:
assert str(np.dtype(ndarray[0].dtype)) in dTypeDic_NumPy2GDAL, "Unknown target datatype '%s'." % ndarray.dtype
assert str(np.dtype(ndarray[0].dtype)) in dTypeDic_NumPy2GDAL, "Unknown target datatype '%s'." \
% ndarray[0].dtype
assert isinstance(in_gt, (list, tuple)), "If 'ndarray' is a list, 'in_gt' must also be a list!"
assert isinstance(in_prj, (list, tuple)), "If 'ndarray' is a list, 'in_prj' must also be a list!"
assert len(list(set([arr.dtype for arr in ndarray]))) == 1, "Data types of input ndarray list must be equal."
......@@ -497,13 +498,12 @@ def warp_ndarray(ndarray, in_gt, in_prj=None, out_prj=None, out_dtype=None,
class SensorMapGeometryTransformer(object):
def __init__(self, data, lons, lats, resamp_alg='nearest', radius_of_influence=30, **opts):
# type: (np.ndarray, np.ndarray, np.ndarray, str, int, Any) -> None
def __init__(self, lons, lats, resamp_alg='nearest', radius_of_influence=30, **opts):
# type: (np.ndarray, np.ndarray, str, int, Any) -> None
"""Get an instance of SensorMapGeometryTransformer.
:param data: numpy array to be warped to sensor or map geometry
:param lons: 2D longitude array
:param lats: 2D latitude array
:param lons: 2D longitude array corresponding to the sensor geometry array
:param lats: 2D latitude array corresponding to the sensor geometry array
:Keyword Arguments: (further documentation here: https://pyresample.readthedocs.io/en/latest/swath.html)
- resamp_alg: resampling algorithm ('nearest', 'bilinear', 'gauss', 'custom')
......@@ -534,7 +534,6 @@ class SensorMapGeometryTransformer(object):
if lats.ndim > 2:
raise NotImplementedError(lats, '%dD latitude arrays are not yet supported.' % lats.ndim)
self.data = data
self.resamp_alg = resamp_alg
self.opts = dict(radius_of_influence=radius_of_influence,
sigmas=(radius_of_influence / 2))
......@@ -567,10 +566,11 @@ class SensorMapGeometryTransformer(object):
return tgt_extent
def compute_output_shape(self, tgt_prj, tgt_extent=None, tgt_res=None):
# type: (Union[int, str], Iterable[float, float, float, float], Tuple[float, float]) -> (int, int, tuple, ...)
"""Estimates the map geometry output shape of a sensor geometry array resampled to map geometry.
def compute_areadefinition_sensor2map(self, data, tgt_prj, tgt_extent=None, tgt_res=None):
# type: (np.ndarray, Union[int, str], Tuple[float, float, float, float], Tuple[float, float]) -> AreaDefinition
"""Compute the area_definition to resample a sensor geometry array to map geometry.
:param data: numpy array to be warped to sensor or map geometry
:param tgt_prj: target projection (WKT or 'epsg:1234' or <EPSG_int>)
:param tgt_extent: extent coordinates of output map geometry array (LL_x, LL_y, UR_x, UR_y) in the tgt_prj
(automatically computed from the corner positions of the coordinate arrays)
......@@ -606,7 +606,7 @@ class SensorMapGeometryTransformer(object):
del ds_xy_coords, vrt
# create VRT for one data band
mask_band = np.ones((self.data.shape[:2]), np.int32)
mask_band = np.ones((data.shape[:2]), np.int32)
write_numpy_to_image(mask_band, path_data, 'ENVI')
ds_data = gdal.Open(path_data)
vrt = drv_vrt.CreateCopy(path_datavrt, ds_data)
......@@ -647,7 +647,6 @@ class SensorMapGeometryTransformer(object):
x_size = ds_out.RasterXSize
y_size = ds_out.RasterYSize
out_gt = ds_out.GetGeoTransform()
out_prj = ds_out.GetProjection()
del ds_out
# add 1 px buffer around out_extent to avoid cutting the output image
......@@ -658,86 +657,105 @@ class SensorMapGeometryTransformer(object):
out_gt[3] += abs(out_gt[5])
out_gt = tuple(out_gt)
xmin, xmax, ymin, ymax = corner_coord_to_minmax(get_corner_coordinates(gt=out_gt, cols=x_size, rows=y_size))
out_extent = list((xmin, ymin, xmax, ymax))
return x_size, y_size, out_gt, out_prj, out_extent
def _resample(self, source_geo_def, target_geo_def):
# type: (Union[AreaDefinition, SwathDefinition], Union[AreaDefinition, SwathDefinition]) -> np.ndarray
out_extent = xmin, ymin, xmax, ymax
# get area_definition
area_definition = get_area_def(area_id='',
area_name='',
proj_id='',
proj4_args=get_proj4info(proj=tgt_prj),
x_size=x_size,
y_size=y_size,
area_extent=list(out_extent),
) # type: AreaDefinition
return area_definition
def _resample(self, data, source_geo_def, target_geo_def):
# type: (np.ndarray, Union[AreaDefinition, SwathDefinition], Union[AreaDefinition, SwathDefinition]) -> ...
"""Run the resampling algorithm.
:param data: numpy array to be warped to sensor or map geometry
:param source_geo_def: source geo definition
:param target_geo_def: target geo definition
:return:
"""
if self.resamp_alg == 'nearest':
opts = {k: v for k, v in self.opts.items() if k not in ['sigmas']}
result = resample_nearest(source_geo_def, self.data, target_geo_def, **opts)
result = resample_nearest(source_geo_def, data, target_geo_def, **opts)
elif self.resamp_alg == 'bilinear':
opts = {k: v for k, v in self.opts.items() if k not in ['sigmas']}
result = resample_bilinear(self.data, source_geo_def, target_geo_def, **opts)
result = resample_bilinear(data, source_geo_def, target_geo_def, **opts)
elif self.resamp_alg == 'gauss':
opts = {k: v for k, v in self.opts.items()}
result = resample_gauss(source_geo_def, self.data, target_geo_def, **opts)
result = resample_gauss(source_geo_def, data, target_geo_def, **opts)
elif self.resamp_alg == 'custom':
opts = {k: v for k, v in self.opts.items()}
if 'weight_funcs' not in opts:
raise ValueError(opts, "Options must contain a 'weight_funcs' item.")
result = resample_custom(source_geo_def, self.data, target_geo_def, **opts)
result = resample_custom(source_geo_def, data, target_geo_def, **opts)
else:
raise ValueError(self.resamp_alg)
return result
return result # type: np.ndarray
@staticmethod
def _get_gt_prj_from_areadefinition(area_definition):
# type: (AreaDefinition) -> (Tuple[float, float, float, float, float, float], str)
gt = area_definition.area_extent[0], area_definition.pixel_size_x, 0, \
area_definition.area_extent[3], 0, -area_definition.pixel_size_y
prj = proj4_to_WKT(area_definition.proj_str)
def to_map_geometry(self, tgt_prj, tgt_extent=None, tgt_res=None):
# type: (Union[str, int], Iterable[float, float, float, float], Tuple[float, float]) -> (np.ndarray, tuple, str)
return gt, prj
def to_map_geometry(self, data, tgt_prj, tgt_extent=None, tgt_res=None, area_definition=None):
# type: (np.ndarray, Union[str, int], Tuple[float, float, float, float], Tuple, AreaDefinition) -> ...
"""Transform the input sensor geometry array into map geometry.
:param tgt_prj: target projection (WKT or 'epsg:1234' or <EPSG_int>)
:param tgt_extent: extent coordinates of output map geometry array (LL_x, LL_y, UR_x, UR_y) in the tgt_prj
:param tgt_res: target X/Y resolution (e.g., (30, 30))
:param data: numpy array (representing sensor geometry) to be warped to map geometry
:param tgt_prj: target projection (WKT or 'epsg:1234' or <EPSG_int>)
:param tgt_extent: extent coordinates of output map geometry array (LL_x, LL_y, UR_x, UR_y) in the tgt_prj
:param tgt_res: target X/Y resolution (e.g., (30, 30))
:param area_definition: an instance of pyresample.geometry.AreaDefinition; replaces tgt_extent and tgt_res;
saves computation time
"""
cols_out, rows_out, out_gt, out_prj, out_extent = \
self.compute_output_shape(tgt_prj=tgt_prj, tgt_extent=tgt_extent, tgt_res=tgt_res)
if not self.area_definition:
self.area_definition = \
get_area_def(area_id='',
area_name='',
proj_id='',
proj4_args=get_proj4info(proj=tgt_prj),
x_size=cols_out,
y_size=rows_out,
area_extent=out_extent,
) # type: AreaDefinition
data_mapgeo = self._resample(self.swath_definition, self.area_definition)
if not data.shape == self.lons.shape == self.lats.shape:
raise ValueError(data.shape,
'Expected a sensor geometry data array with a shape of %s.' % str(self.lons.shape))
self.area_definition = area_definition or \
self.compute_areadefinition_sensor2map(
data, tgt_prj=tgt_prj, tgt_extent=tgt_extent, tgt_res=tgt_res)
data_mapgeo = self._resample(data, self.swath_definition, self.area_definition)
out_gt, out_prj = self._get_gt_prj_from_areadefinition(self.area_definition)
# output validation
if not data_mapgeo.shape == (rows_out, cols_out):
if not data_mapgeo.shape == (self.area_definition.y_size, self.area_definition.x_size):
raise RuntimeError('The computed map geometry output does not have the expected shape. '
'Expected: %s; output array shape: %s.'
% (rows_out, cols_out), data_mapgeo.shape)
% (self.area_definition.y_size, self.area_definition.x_size), data_mapgeo.shape)
return data_mapgeo, out_gt, out_prj
return data_mapgeo, out_gt, out_prj # type: Tuple[np.ndarray, tuple, str]
def to_sensor_geometry(self, src_prj, src_extent):
# type: (Union[str, int], List[float, float, float, float]) -> np.ndarray
def to_sensor_geometry(self, data, src_prj, src_extent):
# type: (np.ndarray, Union[str, int], List[float, float, float, float]) -> np.ndarray
"""Transform the input map geometry array into sensor geometry
:param data: numpy array (representing map geometry) to be warped to sensor geometry
:param src_prj: projection of the input map geometry array (WKT or 'epsg:1234' or <EPSG_int>)
:param src_extent: extent coordinates of input map geometry array (LL_x, LL_y, UR_x, UR_y) in the src_prj
"""
proj4_args = proj4_to_dict(get_proj4info(proj=src_prj))
self.area_definition = AreaDefinition('', '', '', proj4_args, self.data.shape[1], self.data.shape[0],
self.area_definition = AreaDefinition('', '', '', proj4_args, data.shape[1], data.shape[0],
src_extent)
data_sensorgeo = self._resample(self.area_definition, self.swath_definition)
data_sensorgeo = self._resample(data, self.area_definition, self.swath_definition)
# output validation
if not data_sensorgeo.shape == self.lats.shape:
......
......@@ -44,22 +44,21 @@ class Test_SensorMapGeometryTransformer(TestCase):
5267203.56579] # UR_y
def test_to_sensor_geometry(self):
SMGT = SensorMapGeometryTransformer(self.dem_map_geo,
lons=self.lons,
SMGT = SensorMapGeometryTransformer(lons=self.lons,
lats=self.lats,
resamp_alg='nearest')
dem_sensors_geo = SMGT.to_sensor_geometry(src_prj=32632, src_extent=self.dem_area_extent_coarse_subset_utm)
dem_sensors_geo = SMGT.to_sensor_geometry(self.dem_map_geo,
src_prj=32632, src_extent=self.dem_area_extent_coarse_subset_utm)
self.assertIsInstance(dem_sensors_geo, np.ndarray)
self.assertEquals(dem_sensors_geo.shape, (150, 1000))
def test_to_map_geometry_lonlat(self):
SMGT = SensorMapGeometryTransformer(self.dem_sensor_geo,
lons=self.lons,
SMGT = SensorMapGeometryTransformer(lons=self.lons,
lats=self.lats,
resamp_alg='nearest')
# to Lon/Lat
dem_map_geo, dem_gt, dem_prj = SMGT.to_map_geometry(tgt_prj=4326)
dem_map_geo, dem_gt, dem_prj = SMGT.to_map_geometry(self.dem_sensor_geo, tgt_prj=4326)
self.assertIsInstance(dem_map_geo, np.ndarray)
self.assertEquals(dem_map_geo.shape, (286, 1058))
xmin, xmax, ymin, ymax = corner_coord_to_minmax(get_corner_coordinates(gt=dem_gt,
......@@ -68,14 +67,16 @@ class Test_SensorMapGeometryTransformer(TestCase):
self.assertTrue(False not in np.isclose(np.array([xmin, ymin, xmax, ymax]),
np.array(self.expected_dem_area_extent_lonlat)))
with self.assertRaises(ValueError):
SMGT.to_map_geometry(self.dem_sensor_geo[:10, :10], tgt_prj=4326) # must have the shape of lons/lats
def test_to_map_geometry_utm(self):
SMGT = SensorMapGeometryTransformer(self.dem_sensor_geo,
lons=self.lons,
SMGT = SensorMapGeometryTransformer(lons=self.lons,
lats=self.lats,
resamp_alg='nearest')
# to UTM32
dem_map_geo, dem_gt, dem_prj = SMGT.to_map_geometry(tgt_prj=32632, tgt_res=(30, 30))
dem_map_geo, dem_gt, dem_prj = SMGT.to_map_geometry(self.dem_sensor_geo, tgt_prj=32632, tgt_res=(30, 30))
self.assertIsInstance(dem_map_geo, np.ndarray)
self.assertEquals(dem_map_geo.shape, (365, 975))
xmin, xmax, ymin, ymax = corner_coord_to_minmax(get_corner_coordinates(gt=dem_gt,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment