Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Daniel Scheffler
py_tools_ds
Commits
8b6428e7
Commit
8b6428e7
authored
Jan 08, 2019
by
Daniel Scheffler
Browse files
Added SensorMapGeometryTransformer3D + tests.
parent
5007dbc2
Pipeline
#3561
failed with stages
in 60 minutes and 42 seconds
Changes
2
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
py_tools_ds/geo/raster/reproject.py
View file @
8b6428e7
...
...
@@ -502,8 +502,8 @@ class SensorMapGeometryTransformer(object):
# type: (np.ndarray, np.ndarray, str, int, Any) -> None
"""Get an instance of SensorMapGeometryTransformer.
:param lons: 2D longitude array corresponding to the sensor geometry array
:param lats: 2D latitude array corresponding to the sensor geometry array
:param lons: 2D longitude array corresponding to the
2D
sensor geometry array
:param lats: 2D latitude array corresponding to the
2D
sensor geometry array
:Keyword Arguments: (further documentation here: https://pyresample.readthedocs.io/en/latest/swath.html)
- resamp_alg: resampling algorithm ('nearest', 'bilinear', 'gauss', 'custom')
...
...
@@ -529,10 +529,10 @@ class SensorMapGeometryTransformer(object):
NOTE: resampling function has 3 return values instead of 1: result, stddev, count
"""
# validation
if
lons
.
ndim
>
2
:
raise
NotImplementedError
(
lons
,
'%dD longitude arrays are not yet supported
.'
%
lons
.
ndim
)
if
lats
.
ndim
>
2
:
raise
NotImplementedError
(
lats
,
'%dD latitude arrays are not yet supported
.'
%
lats
.
ndim
)
if
lons
.
ndim
!=
2
:
raise
ValueError
(
'Expected a 2D longitude array. Received a %dD array
.'
%
lons
.
ndim
)
if
lats
.
ndim
!=
2
:
raise
ValueError
(
'Expected a 2D latitude array. Received a %dD array
.'
%
lats
.
ndim
)
if
lons
.
shape
!=
lats
.
shape
:
raise
ValueError
((
lons
.
shape
,
lats
.
shape
),
"'lons' and 'lats' are expected to have the same shape."
)
...
...
@@ -783,3 +783,167 @@ class SensorMapGeometryTransformer(object):
%
(
self
.
lats
.
shape
,
data_sensorgeo
.
shape
))
return
data_sensorgeo
class
SensorMapGeometryTransformer3D
(
object
):
def
__init__
(
self
,
lons
,
lats
,
resamp_alg
=
'nearest'
,
radius_of_influence
=
30
,
**
opts
):
# type: (np.ndarray, np.ndarray, str, int, Any) -> None
"""Get an instance of SensorMapGeometryTransformer.
:param lons: 3D longitude array corresponding to the 3D sensor geometry array
:param lats: 3D latitude array corresponding to the 3D sensor geometry array
:Keyword Arguments: (further documentation here: https://pyresample.readthedocs.io/en/latest/swath.html)
- resamp_alg: resampling algorithm ('nearest', 'bilinear', 'gauss', 'custom')
- radius_of_influence: <float> Cut off distance in meters (default: 30)
NOTE: keyword is named 'radius' in case of bilinear resampling
- sigmas: <list of floats or float> [ONLY 'gauss'] List of sigmas to use for the gauss
weighting of each channel 1 to k, w_k = exp(-dist^2/sigma_k^2). If only one channel
is resampled sigmas is a single float value.
- neighbours: <int> [ONLY 'bilinear', 'gauss'] Number of neighbours to consider for each grid
point when searching the closest corner points
- epsilon: <float> Allowed uncertainty in meters. Increasing uncertainty reduces execution time
- weight_funcs: <list of function objects or function object> [ONLY 'custom'] List of weight
functions f(dist) to use for the weighting of each channel 1 to k. If only one
channel is resampled weight_funcs is a single function object.
- fill_value: <int or None> Set undetermined pixels to this value.
If fill_value is None a masked array is returned with undetermined pixels masked
- reduce_data: <bool> Perform initial coarse reduction of source dataset in order to reduce
execution time
- nprocs: <int>, Number of processor cores to be used
- segments: <int or None> Number of segments to use when resampling.
If set to None an estimate will be calculated
- with_uncert: <bool> [ONLY 'gauss' and 'custom'] Calculate uncertainty estimates
NOTE: resampling function has 3 return values instead of 1: result, stddev, count
"""
# validation
if
lons
.
ndim
!=
3
:
raise
ValueError
(
'Expected a 3D longitude array. Received a %dD array.'
%
lons
.
ndim
)
if
lats
.
ndim
!=
3
:
raise
ValueError
(
'Expected a 3D latitude array. Received a %dD array.'
%
lats
.
ndim
)
if
lons
.
shape
!=
lats
.
shape
:
raise
ValueError
((
lons
.
shape
,
lats
.
shape
),
"'lons' and 'lats' are expected to have the same shape."
)
self
.
lats
=
lats
self
.
lons
=
lons
self
.
resamp_alg
=
resamp_alg
self
.
radius_of_influence
=
radius_of_influence
self
.
opts
=
opts
self
.
CPUs
=
opts
[
'nprocs'
]
if
'nprocs'
in
opts
and
opts
[
'nprocs'
]
else
multiprocessing
.
cpu_count
()
@
staticmethod
def
_to_map_geometry_2D
(
kwargs_dict
):
# type: (dict) -> Tuple[np.ndarray, tuple, str, int]
SMGT2D
=
SensorMapGeometryTransformer
(
lons
=
kwargs_dict
[
'lons_2D'
],
lats
=
kwargs_dict
[
'lats_2D'
],
resamp_alg
=
kwargs_dict
[
'resamp_alg'
],
radius_of_influence
=
kwargs_dict
[
'radius_of_influence'
],
**
kwargs_dict
[
'init_opts'
])
data_mapgeo
,
out_gt
,
out_prj
=
SMGT2D
.
to_map_geometry
(
data
=
kwargs_dict
[
'data_sensor_geo_2D'
],
tgt_prj
=
kwargs_dict
[
'tgt_prj'
],
tgt_extent
=
kwargs_dict
[
'tgt_extent'
],
tgt_res
=
kwargs_dict
[
'tgt_res'
])
return
data_mapgeo
,
out_gt
,
out_prj
,
kwargs_dict
[
'band_idx'
]
def
_get_common_target_extent
(
self
,
tgt_epsg
):
corner_coords_ll
=
[[
self
.
lons
[
0
,
0
,
:].
min
(),
self
.
lats
[
0
,
0
,
:].
max
()],
# common UL_xy
[
self
.
lons
[
0
,
-
1
,
:].
max
(),
self
.
lats
[
0
,
-
1
,
:].
max
()],
# common UR_xy
[
self
.
lons
[
-
1
,
0
,
:].
min
(),
self
.
lats
[
-
1
,
0
,
:].
min
()],
# common LL_xy
[
self
.
lons
[
-
1
,
-
1
,
:].
max
(),
self
.
lats
[
-
1
,
-
1
,
:].
min
()],
# common LR_xy
]
corner_coords_tgt_prj
=
[
transform_any_prj
(
EPSG2WKT
(
4326
),
EPSG2WKT
(
tgt_epsg
),
x
,
y
)
for
x
,
y
in
corner_coords_ll
]
corner_coords_tgt_prj_np
=
np
.
array
(
corner_coords_tgt_prj
)
x_coords
,
y_coords
=
corner_coords_tgt_prj_np
[:,
0
],
corner_coords_tgt_prj_np
[:,
1
]
tgt_extent
=
[
np
.
min
(
x_coords
),
np
.
min
(
y_coords
),
np
.
max
(
x_coords
),
np
.
max
(
y_coords
)]
return
tgt_extent
def
to_map_geometry
(
self
,
data
,
tgt_prj
,
tgt_extent
=
None
,
tgt_res
=
None
):
# type: (np.ndarray, Union[str, int], Tuple[float, float, float, float], Tuple) -> ...
"""Transform the input sensor geometry array into map geometry.
:param data: 3D numpy array (representing sensor geometry) to be warped to map geometry
:param tgt_prj: target projection (WKT or 'epsg:1234' or <EPSG_int>)
:param tgt_extent: extent coordinates of output map geometry array (LL_x, LL_y, UR_x, UR_y) in the tgt_prj
:param tgt_res: target X/Y resolution (e.g., (30, 30))
"""
if
data
.
ndim
!=
3
:
raise
ValueError
(
data
.
ndim
,
"'data' must have 3 dimensions."
)
if
data
.
shape
!=
self
.
lons
.
shape
:
raise
ValueError
(
data
.
shape
,
'Expected a sensor geometry data array with %d rows, %d columns and %d bands.'
%
self
.
lons
.
shape
)
# get common target extent
tgt_epsg
=
WKT2EPSG
(
proj4_to_WKT
(
get_proj4info
(
proj
=
tgt_prj
)))
tgt_extent
=
tgt_extent
or
self
.
_get_common_target_extent
(
tgt_epsg
)
args
=
[
dict
(
lons_2D
=
self
.
lons
[:,
:,
band
],
lats_2D
=
self
.
lats
[:,
:,
band
],
resamp_alg
=
self
.
resamp_alg
,
radius_of_influence
=
self
.
radius_of_influence
,
init_opts
=
self
.
opts
,
data_sensor_geo_2D
=
data
[:,
:,
band
],
tgt_prj
=
tgt_prj
,
tgt_extent
=
tgt_extent
,
tgt_res
=
tgt_res
,
band_idx
=
band
)
for
band
in
range
(
data
.
shape
[
2
])]
with
multiprocessing
.
Pool
(
self
.
CPUs
)
as
pool
:
result
=
pool
.
map
(
self
.
_to_map_geometry_2D
,
args
)
band_inds
=
list
(
np
.
array
(
result
)[:,
-
1
])
data_mapgeo
=
np
.
dstack
((
result
[
band_inds
.
index
(
i
)][
0
]
for
i
in
range
(
data
.
shape
[
2
])))
out_gt
=
result
[
0
][
1
]
out_prj
=
result
[
0
][
1
]
return
data_mapgeo
,
out_gt
,
out_prj
# type: Tuple[np.ndarray, tuple, str]
@
staticmethod
def
_to_sensor_geometry_2D
(
kwargs_dict
):
# type: (dict) -> (np.ndarray, int)
SMGT2D
=
SensorMapGeometryTransformer
(
lons
=
kwargs_dict
[
'lons_2D'
],
lats
=
kwargs_dict
[
'lats_2D'
],
resamp_alg
=
kwargs_dict
[
'resamp_alg'
],
radius_of_influence
=
kwargs_dict
[
'radius_of_influence'
],
**
kwargs_dict
[
'init_opts'
])
data_sensorgeo
=
SMGT2D
.
to_sensor_geometry
(
data
=
kwargs_dict
[
'data_map_geo_2D'
],
src_prj
=
kwargs_dict
[
'src_prj'
],
src_extent
=
kwargs_dict
[
'src_extent'
])
return
data_sensorgeo
,
kwargs_dict
[
'band_idx'
]
def
to_sensor_geometry
(
self
,
data
,
src_prj
,
src_extent
):
# type: (np.ndarray, Union[str, int], List[float, float, float, float]) -> np.ndarray
"""Transform the input map geometry array into sensor geometry
:param data: 3D numpy array (representing map geometry) to be warped to sensor geometry
:param src_prj: projection of the input map geometry array (WKT or 'epsg:1234' or <EPSG_int>)
:param src_extent: extent coordinates of input map geometry array (LL_x, LL_y, UR_x, UR_y) in the src_prj
"""
if
data
.
ndim
!=
3
:
raise
ValueError
(
data
.
ndim
,
"'data' must have 3 dimensions."
)
args
=
[
dict
(
lons_2D
=
self
.
lons
[:,
:,
band
],
lats_2D
=
self
.
lats
[:,
:,
band
],
resamp_alg
=
self
.
resamp_alg
,
radius_of_influence
=
self
.
radius_of_influence
,
init_opts
=
self
.
opts
,
data_map_geo_2D
=
data
[:,
:,
band
],
src_prj
=
src_prj
,
src_extent
=
src_extent
,
band_idx
=
band
)
for
band
in
range
(
data
.
shape
[
2
])]
with
multiprocessing
.
Pool
(
self
.
CPUs
)
as
pool
:
result
=
pool
.
map
(
self
.
_to_sensor_geometry_2D
,
args
)
band_inds
=
list
(
np
.
array
(
result
)[:,
-
1
])
data_sensorgeo
=
np
.
dstack
((
result
[
band_inds
.
index
(
i
)][
0
]
for
i
in
range
(
data
.
shape
[
2
])))
return
data_sensorgeo
tests/test_geo/test_raster/test_reproject.py
View file @
8b6428e7
...
...
@@ -16,7 +16,7 @@ from gdalnumeric import LoadFile
from
py_tools_ds
import
__path__
from
py_tools_ds.geo.coord_calc
import
corner_coord_to_minmax
,
get_corner_coordinates
from
py_tools_ds.geo.raster.reproject
import
SensorMapGeometryTransformer
from
py_tools_ds.geo.raster.reproject
import
SensorMapGeometryTransformer
,
SensorMapGeometryTransformer3D
tests_path
=
os
.
path
.
abspath
(
os
.
path
.
join
(
__path__
[
0
],
".."
,
"tests"
))
...
...
@@ -84,3 +84,55 @@ class Test_SensorMapGeometryTransformer(TestCase):
rows
=
dem_map_geo
.
shape
[
0
]))
self
.
assertTrue
(
False
not
in
np
.
isclose
(
np
.
array
([
xmin
,
ymin
,
xmax
,
ymax
]),
np
.
array
(
self
.
expected_dem_area_extent_utm
)))
class
Test_SensorMapGeometryTransformer3D
(
TestCase
):
def
setUp
(
self
):
dem_map_geo
=
LoadFile
(
os
.
path
.
join
(
tests_path
,
'data'
,
'dem_map_geo.bsq'
))
dem_sensor_geo
=
LoadFile
(
os
.
path
.
join
(
tests_path
,
'data'
,
'dem_sensor_geo.bsq'
))
lons
=
LoadFile
(
os
.
path
.
join
(
tests_path
,
'data'
,
'lons_full_vnir.bsq'
))
lats
=
LoadFile
(
os
.
path
.
join
(
tests_path
,
'data'
,
'lats_full_vnir.bsq'
))
self
.
data_map_geo_3D
=
np
.
dstack
([
dem_map_geo
,
dem_map_geo
])
self
.
data_sensor_geo_3D
=
np
.
dstack
([
dem_sensor_geo
,
dem_sensor_geo
])
self
.
lons_3D
=
np
.
dstack
([
lons
,
lons
])
# TODO use different lons per band here
self
.
lats_3D
=
np
.
dstack
([
lats
,
lats
])
# TODO use different lats per band here
self
.
dem_area_extent_coarse_subset_utm
=
[
622613.864409047
,
# LL_x
5254111.40255343
,
# LL_x
660473.864409047
,
# LL_x
5269351.40255343
]
# UR_y
self
.
expected_dem_area_extent_lonlat
=
[
10.685733901515151
,
# LL_x
47.44113415492957
,
# LL_y
11.073066098484848
,
# UR_x
47.54576584507042
]
# UR_y
self
.
expected_dem_area_extent_utm
=
[
626938.928052
,
# LL_x
5256253.56579
,
# LL_y
656188.928052
,
# UR_x
5267203.56579
]
# UR_y
def
test_to_map_geometry_lonlat_3D_geolayer
(
self
):
SMGT
=
SensorMapGeometryTransformer3D
(
lons
=
self
.
lons_3D
,
lats
=
self
.
lats_3D
,
resamp_alg
=
'nearest'
)
# to Lon/Lat
data_mapgeo_3D
,
dem_gt
,
dem_prj
=
SMGT
.
to_map_geometry
(
self
.
data_sensor_geo_3D
,
tgt_prj
=
4326
)
self
.
assertIsInstance
(
data_mapgeo_3D
,
np
.
ndarray
)
self
.
assertEquals
(
data_mapgeo_3D
.
shape
,
(
286
,
1058
,
2
))
xmin
,
xmax
,
ymin
,
ymax
=
corner_coord_to_minmax
(
get_corner_coordinates
(
gt
=
dem_gt
,
cols
=
data_mapgeo_3D
.
shape
[
1
],
rows
=
data_mapgeo_3D
.
shape
[
0
]))
self
.
assertTrue
(
False
not
in
np
.
isclose
(
np
.
array
([
xmin
,
ymin
,
xmax
,
ymax
]),
np
.
array
(
self
.
expected_dem_area_extent_lonlat
)))
def
test_to_sensor_geometry
(
self
):
SMGT
=
SensorMapGeometryTransformer3D
(
lons
=
self
.
lons_3D
,
lats
=
self
.
lats_3D
,
resamp_alg
=
'nearest'
)
dem_sensors_geo
=
SMGT
.
to_sensor_geometry
(
self
.
data_map_geo_3D
,
src_prj
=
32632
,
src_extent
=
self
.
dem_area_extent_coarse_subset_utm
)
self
.
assertIsInstance
(
dem_sensors_geo
,
np
.
ndarray
)
self
.
assertEquals
(
dem_sensors_geo
.
shape
,
(
150
,
1000
,
2
))
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment