Skip to content
Snippets Groups Projects
Commit ee893d8f authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Added class 'VNIR_SWIR_SensorGeometryTransformer' to transform between VNIR...

Added class 'VNIR_SWIR_SensorGeometryTransformer' to transform between VNIR and SWIR sensor geometry + tests. Updated minimal version of py_tools_ds.

Signed-off-by: default avatarDaniel Scheffler <danschef@gfz-potsdam.de>
parent 5da736bd
Branches
Tags
1 merge request!27Feature/add sensorgeo transformation
Pipeline #5062 passed
......@@ -18,7 +18,7 @@ test_enpt:
- pip install 'geoarray>=0.8.11'
# update py_tools_ds
- pip install 'py_tools_ds>=0.14.14'
- pip install 'py_tools_ds>=0.14.22'
# switch sicor branch
- mkdir /tmp/sicor_tables
......
......@@ -28,7 +28,7 @@ Using conda_, the recommended approach is:
# install py_tools_ds
conda install -c conda-forge pyresample
pip install 'py_tools_ds>=0.14.14'
pip install 'py_tools_ds>=0.14.22'
# install sicor
conda install -c conda-forge pygrib h5py pytables pyfftw numba llvmlite scikit-learn
......
......@@ -61,7 +61,7 @@ class Geometry_Transformer(SensorMapGeometryTransformer):
src_extent=src_extent or list(np.array(data_mapgeo.box.boundsMap)[[0, 2, 1, 3]]))
def to_map_geometry(self,
path_or_geoarray_sensorgeo: Union[str, GeoArray],
path_or_geoarray_sensorgeo: Union[str, GeoArray, np.ndarray],
tgt_prj: Union[str, int] = None,
tgt_extent: Tuple[float, float, float, float] = None,
tgt_res: Tuple[float, float] = None,
......@@ -118,7 +118,7 @@ class Geometry_Transformer_3D(SensorMapGeometryTransformer3D):
src_extent=src_extent or list(np.array(data_mapgeo.box.boundsMap)[[0, 2, 1, 3]]))
def to_map_geometry(self,
path_or_geoarray_sensorgeo: Union[str, GeoArray],
path_or_geoarray_sensorgeo: Union[str, GeoArray, np.ndarray],
tgt_prj: Union[str, int] = None,
tgt_extent: Tuple[float, float, float, float] = None,
tgt_res: Tuple[float, float] = None):
......@@ -151,6 +151,87 @@ class Geometry_Transformer_3D(SensorMapGeometryTransformer3D):
return out_data, out_gt, out_prj
class VNIR_SWIR_SensorGeometryTransformer(object):
def __init__(self,
lons_vnir: np.ndarray,
lats_vnir: np.ndarray,
lons_swir: np.ndarray,
lats_swir: np.ndarray,
prj_vnir: Union[str, int],
prj_swir: Union[str, int],
res_vnir: Tuple[float, float],
res_swir: Tuple[float, float],
resamp_alg: str = 'nearest') -> None:
"""Get an instance of VNIR_SWIR_SensorGeometryTransformer.
:param lons_vnir: 2D or 3D VNIR longitude array corresponding to the sensor geometry arrays passed later
:param lats_vnir: 2D or 3D VNIR latitude array corresponding to the sensor geometry arrays passed later
:param lons_swir: 2D or 3D SWIR longitude array corresponding to the sensor geometry arrays passed later
:param lats_swir: 2D or 3D SWIR latitude array corresponding to the sensor geometry arrays passed later
:param prj_vnir: projection of the VNIR if it would be transformed to map geometry (WKT string or EPSG code)
:param prj_swir: projection of the SWIR if it would be transformed to map geometry (WKT string or EPSG code)
:param res_vnir: pixel dimensions of the VNIR if it would be transformed to map geometry (X, Y)
:param res_swir: pixel dimensions of the SWIR if it would be transformed to map geometry (X, Y)
:param resamp_alg: resampling algorithm ('nearest', 'gauss')
"""
[self._validate_lonlat_ndim(ll) for ll in [lons_vnir, lats_vnir, lons_swir, lats_swir]]
self.vnir_meta = dict(
lons=lons_vnir,
lats=lats_vnir,
prj=prj_vnir,
res=res_vnir
)
self.swir_meta = dict(
lons=lons_swir,
lats=lats_swir,
prj=prj_swir,
res=res_swir
)
self.resamp_alg = resamp_alg
def _validate_lonlat_ndim(self, coord_array):
if coord_array.ndim > 2:
raise NotImplementedError("%dD longitude/latitude array are currently not supported.")
def transform_sensorgeo_VNIR_to_SWIR(self, data_vnirsensorgeo: np.ndarray) -> np.ndarray:
"""Transform any array in VNIR sensor geometry to SWIR sensor geometry to remove geometric shifts.
:param data_vnirsensorgeo: input array in VNIR sensor geometry
:return: input array resampled to SWIR sensor geometry
"""
return self._transform_sensorgeo(data_vnirsensorgeo, inputgeo='vnir')
def transform_sensorgeo_SWIR_to_VNIR(self, data_swirsensorgeo: np.ndarray) -> np.ndarray:
"""Transform any array in SWIR sensor geometry to VNIR sensor geometry to remove geometric shifts.
:param data_swirsensorgeo: input array in SWIR sensor geometry
:return: input array resampled to VNIR sensor geometry
"""
return self._transform_sensorgeo(data_swirsensorgeo, inputgeo='swir')
def _transform_sensorgeo(self,
data2transform: np.ndarray,
inputgeo: str = 'vnir') -> np.ndarray:
if inputgeo not in ['vnir', 'swir']:
raise ValueError(inputgeo)
src, tgt = (self.vnir_meta, self.swir_meta) if inputgeo == 'vnir' else (self.swir_meta, self.vnir_meta)
# compute area definition of target geometry
GT_tgt = Geometry_Transformer(lons=tgt['lons'], lats=tgt['lats'], resamp_alg=self.resamp_alg)
areadef_tgt = GT_tgt.compute_areadefinition_sensor2map(data2transform, tgt['prj'], tgt_res=tgt['res'])
# temporarily transform the input sensor geometry array to map geometry
GT_src = Geometry_Transformer(lons=src['lons'], lats=src['lats'], resamp_alg=self.resamp_alg)
gA_mapgeo = GeoArray(*GT_src.to_map_geometry(data2transform, tgt_prj=src['prj'], area_definition=areadef_tgt))
# generate the target sensor geometry array
tgt_data_sensorgeo = GT_src.to_sensor_geometry(gA_mapgeo)
return tgt_data_sensorgeo
def move_extent_to_EnMAP_grid(extent_utm: Tuple[float, float, float, float]) -> Tuple[float, float, float, float]:
"""Move a UTM coordinate extent to the EnMAP coordinate grid (30m x 30m, origin at 0/0).
......
......@@ -2,7 +2,7 @@ numpy
pandas
scipy
geoarray>=0.8.11
py_tools_ds>=0.14.14
py_tools_ds>=0.14.22
cerberus
jsmin
matplotlib
......
......@@ -36,7 +36,7 @@ with open("enpt/version.py") as version_file:
exec(version_file.read(), version)
requirements = [ # put package requirements here
'numpy', 'pandas', 'scipy', 'geoarray>=0.8.11', 'py_tools_ds>=0.14.14', 'cerberus', 'jsmin', 'matplotlib', 'tqdm',
'numpy', 'pandas', 'scipy', 'geoarray>=0.8.11', 'py_tools_ds>=0.14.22', 'cerberus', 'jsmin', 'matplotlib', 'tqdm',
'utm', 'lxml', 'numpy-indexed'
# 'sicor', # pip install git+https://gitext.gfz-potsdam.de/EnMAP/sicor.git
]
......
......@@ -45,7 +45,7 @@ dependencies:
- pip:
- scipy
- geoarray>=0.8.11
- py_tools_ds>=0.14.14
- py_tools_ds>=0.14.22
- cerberus
- jsmin
- tqdm
......
This diff is collapsed.
......@@ -41,8 +41,9 @@ import numpy as np
from geoarray import GeoArray
from py_tools_ds.geo.coord_grid import is_point_on_grid
from enpt.processors.spatial_transform import Geometry_Transformer, RPC_Geolayer_Generator
from enpt.options.config import config_for_testing, EnPTConfig
from enpt.processors.spatial_transform import \
Geometry_Transformer, RPC_Geolayer_Generator, VNIR_SWIR_SensorGeometryTransformer
from enpt.options.config import config_for_testing, EnPTConfig, config_for_testing_dlr
from enpt.io.reader import L1B_Reader
from enpt.options.config import enmap_coordinate_grid
......@@ -97,6 +98,63 @@ class Test_Geometry_Transformer(TestCase):
self.assertEqual(data_sensorgeo.shape, self.gA2transform_sensorgeo.shape)
class Test_VNIR_SWIR_SensorGeometryTransformer(TestCase):
@classmethod
def setUpClass(cls) -> None:
config = EnPTConfig(**config_for_testing_dlr)
# get lons / lats
with TemporaryDirectory() as td, ZipFile(config.path_l1b_enmap_image, "r") as zf:
zf.extractall(td)
cls.L1_obj = L1B_Reader(config=config).read_inputdata(
root_dir_main=td,
compute_snr=False)
cls.data2transform_vnir_sensorgeo = cls.L1_obj.vnir.data[:, :, 50] # a single VNIR band in sensor geometry
cls.gA2transform_vnir_mapgeo = GeoArray(config.path_dem) # a DEM in map geometry given by the user
cls.data2transform_swir_sensorgeo = cls.L1_obj.swir.data[:, :, 50] # a single SWIR band in sensor geometry
cls.gA2transform_swir_mapgeo = GeoArray(config.path_dem) # a DEM in map geometry given by the user
cls.VS_SGT = VNIR_SWIR_SensorGeometryTransformer(lons_vnir=cls.L1_obj.meta.vnir.lons[:, :, 0],
lats_vnir=cls.L1_obj.meta.vnir.lats[:, :, 0],
lons_swir=cls.L1_obj.meta.swir.lons[:, :, 0],
lats_swir=cls.L1_obj.meta.swir.lats[:, :, 0],
prj_vnir=32632,
prj_swir=32632,
res_vnir=(30, 30),
res_swir=(30, 30),
resamp_alg='nearest'
)
def test_transform_sensorgeo_VNIR_to_SWIR(self):
data_swir_sensorgeo = self.VS_SGT.transform_sensorgeo_VNIR_to_SWIR(self.data2transform_vnir_sensorgeo)
self.assertIsInstance(data_swir_sensorgeo, np.ndarray)
self.assertEquals(data_swir_sensorgeo.shape, self.data2transform_vnir_sensorgeo.shape)
def test_transform_sensorgeo_SWIR_to_VNIR(self):
data_vnir_sensorgeo = self.VS_SGT.transform_sensorgeo_SWIR_to_VNIR(self.data2transform_swir_sensorgeo)
self.assertIsInstance(data_vnir_sensorgeo, np.ndarray)
self.assertEquals(data_vnir_sensorgeo.shape, self.data2transform_vnir_sensorgeo.shape)
# GeoArray(data_vnir_sensorgeo).save('enpt_data_vnir_sensorgeo_nearest.bsq')
def test_transform_sensorgeo_SWIR_to_VNIR_3DInput_2DGeolayer(self):
data2transform_swir_sensorgeo_3D = np.dstack([self.data2transform_swir_sensorgeo] * 2)
data_vnir_sensorgeo = self.VS_SGT.transform_sensorgeo_SWIR_to_VNIR(data2transform_swir_sensorgeo_3D)
GeoArray(data_vnir_sensorgeo).save('/home/gfz-fe/scheffler/temp/enpt_3Ddata_vnir_sensorgeo_nearest.bsq')
def test_3D_geolayer(self):
with self.assertRaises(NotImplementedError):
VNIR_SWIR_SensorGeometryTransformer(lons_vnir=self.L1_obj.meta.vnir.lons,
lats_vnir=self.L1_obj.meta.vnir.lats,
lons_swir=self.L1_obj.meta.swir.lons,
lats_swir=self.L1_obj.meta.swir.lats,
prj_vnir=32632,
prj_swir=32632,
res_vnir=(30, 30),
res_swir=(30, 30),
)
class Test_RPC_Geolayer_Generator(TestCase):
def setUp(self):
with open(os.path.join(path_testdata, 'rpc_coeffs_B200.pkl'), 'rb') as pklF:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment