Commit c5756a9c authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Merge branch 'bugfix/fix_openmp_deadlock' into 'master'

Bugfix/fix openmp deadlock

Closes #6

See merge request !5
parents 55a65077 af8a3c14
Pipeline #12324 passed with stages
in 1 minute and 14 seconds
...@@ -56,7 +56,7 @@ test_sensormapgeo_install: ...@@ -56,7 +56,7 @@ test_sensormapgeo_install:
# resolve some requirements with conda # resolve some requirements with conda
# - conda config --set channel_priority strict # otherwise gdal or libgdal may be installed from defaults channel # - conda config --set channel_priority strict # otherwise gdal or libgdal may be installed from defaults channel
- conda install --yes -q -c conda-forge numpy gdal pyresample pyqt scikit-image pyproj lxml geopandas ipython - conda install --yes -q -c conda-forge numpy gdal pyresample pyqt scikit-image pyproj lxml geopandas ipython _openmp_mutex=*=*llvm*
# run installer # run installer
- python setup.py install - python setup.py install
......
...@@ -2,6 +2,14 @@ ...@@ -2,6 +2,14 @@
History History
======= =======
0.4.4 (2020-09-04)
------------------
* Fixed issue #6 (Deadlock within SensorMapGeometryTransformer3D when running in multiprocessing for resampling
algorithms 'near' and 'gauss'.)
* Added pebble to pip requirements.
0.4.3 (2020-09-02) 0.4.3 (2020-09-02)
------------------ ------------------
......
...@@ -100,9 +100,8 @@ class SensorMapGeometryTransformer(object): ...@@ -100,9 +100,8 @@ class SensorMapGeometryTransformer(object):
# NOTE: If pykdtree is built with OpenMP support (default) the number of threads is controlled with the # NOTE: If pykdtree is built with OpenMP support (default) the number of threads is controlled with the
# standard OpenMP environment variable OMP_NUM_THREADS. The nprocs argument has no effect on pykdtree. # standard OpenMP environment variable OMP_NUM_THREADS. The nprocs argument has no effect on pykdtree.
os.environ['OMP_NUM_THREADS'] = '%d' % (self.opts['nprocs'] if 'nprocs' in self.opts else 1)
if 'nprocs' in self.opts: if 'nprocs' in self.opts:
if self.opts['nprocs'] > 1:
os.environ['OMP_NUM_THREADS'] = '%d' % opts['nprocs']
del self.opts['nprocs'] del self.opts['nprocs']
self.lats = lats self.lats = lats
......
...@@ -26,7 +26,11 @@ ...@@ -26,7 +26,11 @@
from typing import Union, Tuple from typing import Union, Tuple
import multiprocessing import multiprocessing
from concurrent.futures import TimeoutError as Timeout
from warnings import warn
import platform
from pebble import ProcessPool, ProcessExpired
import numpy as np import numpy as np
from pyproj import CRS from pyproj import CRS
from py_tools_ds.geo.coord_trafo import transform_any_prj from py_tools_ds.geo.coord_trafo import transform_any_prj
...@@ -221,11 +225,31 @@ class SensorMapGeometryTransformer3D(object): ...@@ -221,11 +225,31 @@ class SensorMapGeometryTransformer3D(object):
) for band in range(data.shape[2])] ) for band in range(data.shape[2])]
if self.opts['nprocs'] > 1 and self.mp_alg == 'bands': if self.opts['nprocs'] > 1 and self.mp_alg == 'bands':
# NOTE: We use imap here as it directly returns the results when available (works like a generator). # NOTE: The pebble ProcessPool directly returns the results when available (works like a generator).
# This saves a lot of memory compared with map. We also don't use an initializer to share the input # This saves a lot of memory compared with map. We also don't use an initializer to share the input
# arrays because this would allocate the memory for the input arrays of all bands at once. # arrays because this would allocate the memory for the input arrays of all bands at once.
with multiprocessing.Pool(self.opts['nprocs']) as pool: # NOTE: Use a multiprocessing imap iterator here when the OpenMP is finally fixed in the pyresample side:
result = [res for res in pool.imap_unordered(self._to_map_geometry_2D, args)] # with multiprocessing.Pool(self.opts['nprocs']) as pool:
# return [res for res in pool.imap_unordered(self._to_map_geometry_2D, args)]
try:
# this may cause a deadlock with the GNU OpenMP build, thus each WORKER has a timeout of 10 seconds
with ProcessPool() as pool:
future = pool.map(self._to_map_geometry_2D, args, timeout=10)
result = [i for i in future.result()]
except (Timeout, ProcessExpired):
# use mp_alg='tiles' instead which uses OpenMP under the hood
msg = "Switched multiprocessing algorithm from 'bands' to 'tiles' due to a timeout in 'bands' mode. "
if platform.system() == 'Linux':
msg += "Consider using the LLVM instead of the GNU build of OpenMP to fix this issue (install, " \
"e.g., by 'conda install -c conda-forge _openmp_mutex=*=1_llvm'."
warn(msg, RuntimeWarning)
init_opts = self.opts.copy()
for argset in args:
argset.update(dict(init_opts=init_opts))
result = [self._to_map_geometry_2D(argsdict) for argsdict in args]
else: else:
result = [self._to_map_geometry_2D(argsdict) for argsdict in args] result = [self._to_map_geometry_2D(argsdict) for argsdict in args]
......
...@@ -22,6 +22,6 @@ ...@@ -22,6 +22,6 @@
# You should have received a copy of the GNU Lesser General Public License along # You should have received a copy of the GNU Lesser General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>. # with this program. If not, see <http://www.gnu.org/licenses/>.
__version__ = '0.4.3' __version__ = '0.4.4'
__versionalias__ = '20200902.01' __versionalias__ = '20200904.01'
__author__ = 'Daniel Scheffler' __author__ = 'Daniel Scheffler'
...@@ -37,7 +37,7 @@ version = {} ...@@ -37,7 +37,7 @@ version = {}
with open("sensormapgeo/version.py") as version_file: with open("sensormapgeo/version.py") as version_file:
exec(version_file.read(), version) exec(version_file.read(), version)
requirements = ['numpy', 'gdal', 'pyresample>=1.11.0', 'py_tools_ds>=0.14.26', 'pyproj>=2.2'] requirements = ['numpy', 'gdal', 'pyresample>=1.11.0', 'py_tools_ds>=0.14.26', 'pyproj>=2.2', 'pebble']
setup_requirements = [] setup_requirements = []
...@@ -46,7 +46,7 @@ test_requirements = ['coverage', 'nose', 'nose-htmloutput', 'rednose'] ...@@ -46,7 +46,7 @@ test_requirements = ['coverage', 'nose', 'nose-htmloutput', 'rednose']
setup( setup(
author="Daniel Scheffler", author="Daniel Scheffler",
author_email='daniel.scheffler@gfz-potsdam.de', author_email='daniel.scheffler@gfz-potsdam.de',
python_requires='>=3.5', python_requires='>=3.6',
classifiers=[ classifiers=[
'Development Status :: 2 - Pre-Alpha', 'Development Status :: 2 - Pre-Alpha',
'Intended Audience :: Developers', 'Intended Audience :: Developers',
......
...@@ -9,6 +9,7 @@ dependencies: ...@@ -9,6 +9,7 @@ dependencies:
- numpy - numpy
- gdal - gdal
- pyresample>=1.11.0 - pyresample>=1.11.0
- _openmp_mutex=*=*llvm* [Unix] # fixes a deadlock (https://gitext.gfz-potsdam.de/EnMAP/sensormapgeo/-/issues/6)
- pyproj>=2.2 - pyproj>=2.2
# py_tools_ds # py_tools_ds
...@@ -20,6 +21,8 @@ dependencies: ...@@ -20,6 +21,8 @@ dependencies:
- pip: - pip:
- py_tools_ds>=0.14.26 - py_tools_ds>=0.14.26
- pebble
- sphinx-argparse - sphinx-argparse
- flake8 - flake8
- pycodestyle - pycodestyle
......
...@@ -39,6 +39,7 @@ from sensormapgeo import SensorMapGeometryTransformer, SensorMapGeometryTransfor ...@@ -39,6 +39,7 @@ from sensormapgeo import SensorMapGeometryTransformer, SensorMapGeometryTransfor
tests_path = os.path.abspath(os.path.join(__path__[0], "..", "tests")) tests_path = os.path.abspath(os.path.join(__path__[0], "..", "tests"))
rsp_algs = ['nearest', 'bilinear', 'gauss'] rsp_algs = ['nearest', 'bilinear', 'gauss']
mp_algs = ['bands', 'tiles']
class Test_SensorMapGeometryTransformer(TestCase): class Test_SensorMapGeometryTransformer(TestCase):
...@@ -194,78 +195,85 @@ class Test_SensorMapGeometryTransformer3D(TestCase): ...@@ -194,78 +195,85 @@ class Test_SensorMapGeometryTransformer3D(TestCase):
def test_to_map_geometry_lonlat_3D_geolayer(self): def test_to_map_geometry_lonlat_3D_geolayer(self):
for rsp_alg in rsp_algs: for rsp_alg in rsp_algs:
SMGT = SensorMapGeometryTransformer3D(lons=self.lons_3D, for mp_alg in mp_algs:
lats=self.lats_3D, SMGT = SensorMapGeometryTransformer3D(lons=self.lons_3D,
# resamp_alg='nearest', lats=self.lats_3D,
resamp_alg=rsp_alg, # resamp_alg='nearest',
) resamp_alg=rsp_alg,
mp_alg=mp_alg
# to Lon/Lat )
data_mapgeo_3D, dem_gt, dem_prj = SMGT.to_map_geometry(self.data_sensor_geo_3D, tgt_prj=4326)
# to Lon/Lat
# from geoarray import GeoArray data_mapgeo_3D, dem_gt, dem_prj = SMGT.to_map_geometry(self.data_sensor_geo_3D, tgt_prj=4326)
# GeoArray(data_mapgeo_3D, dem_gt, dem_prj)\
# .save(os.path.join(tests_path, 'test_output', 'resampled_3D_02_ll.bsq')) # from geoarray import GeoArray
# GeoArray(data_mapgeo_3D, dem_gt, dem_prj)\
self.assertIsInstance(data_mapgeo_3D, np.ndarray) # .save(os.path.join(tests_path, 'test_output', 'resampled_3D_02_ll.bsq'))
# only validate number of bands (height and width are validated in 2D version
# fixed numbers may fail here due to float uncertainty errors self.assertIsInstance(data_mapgeo_3D, np.ndarray)
self.assertGreater(np.dot(*data_mapgeo_3D.shape[:2]), np.dot(*self.data_sensor_geo_3D.shape[:2])) # only validate number of bands (height and width are validated in 2D version
self.assertEqual(data_mapgeo_3D.shape[2], 2) # fixed numbers may fail here due to float uncertainty errors
xmin, xmax, ymin, ymax = corner_coord_to_minmax(get_corner_coordinates(gt=dem_gt, self.assertGreater(np.dot(*data_mapgeo_3D.shape[:2]), np.dot(*self.data_sensor_geo_3D.shape[:2]))
cols=data_mapgeo_3D.shape[1], self.assertEqual(data_mapgeo_3D.shape[2], 2)
rows=data_mapgeo_3D.shape[0])) xmin, xmax, ymin, ymax = corner_coord_to_minmax(get_corner_coordinates(gt=dem_gt,
self.assertTrue(False not in np.isclose(np.array([xmin, ymin, xmax, ymax]), cols=data_mapgeo_3D.shape[1],
np.array(self.expected_dem_area_extent_lonlat))) rows=data_mapgeo_3D.shape[0]))
self.assertTrue(False not in np.isclose(np.array([xmin, ymin, xmax, ymax]),
self.assertTrue(np.isclose(np.mean(data_mapgeo_3D[data_mapgeo_3D != 0]), np.array(self.expected_dem_area_extent_lonlat)))
np.mean(self.data_sensor_geo_3D),
rtol=0.01)) self.assertTrue(np.isclose(np.mean(data_mapgeo_3D[data_mapgeo_3D != 0]),
self.assertEqual(self.data_sensor_geo_3D.dtype, data_mapgeo_3D.dtype) np.mean(self.data_sensor_geo_3D),
rtol=0.01))
self.assertEqual(self.data_sensor_geo_3D.dtype, data_mapgeo_3D.dtype)
def test_to_map_geometry_utm_3D_geolayer(self): def test_to_map_geometry_utm_3D_geolayer(self):
for rsp_alg in rsp_algs: for rsp_alg in rsp_algs:
SMGT = SensorMapGeometryTransformer3D(lons=self.lons_3D, for mp_alg in mp_algs:
lats=self.lats_3D, SMGT = SensorMapGeometryTransformer3D(lons=self.lons_3D,
# resamp_alg='nearest', lats=self.lats_3D,
resamp_alg=rsp_alg, # resamp_alg='nearest',
) resamp_alg=rsp_alg,
mp_alg=mp_alg
# to Lon/Lat )
data_mapgeo_3D, dem_gt, dem_prj = SMGT.to_map_geometry(self.data_sensor_geo_3D,
tgt_prj=32632, # to Lon/Lat
tgt_res=(30, 30), data_mapgeo_3D, dem_gt, dem_prj = SMGT.to_map_geometry(self.data_sensor_geo_3D,
# tgt_extent=self.expected_dem_area_extent_utm, tgt_prj=32632,
tgt_coordgrid=((0, 30), (0, 30)) tgt_res=(30, 30),
) # tgt_extent=self.expected_dem_area_extent_utm,
# from geoarray import GeoArray tgt_coordgrid=((0, 30), (0, 30))
# GeoArray(data_mapgeo_3D, dem_gt, dem_prj)\ )
# .save(os.path.join(tests_path, 'test_output', 'resampled_3D_02_pyresample.bsq')) # from geoarray import GeoArray
# GeoArray(data_mapgeo_3D, dem_gt, dem_prj)\
self.assertIsInstance(data_mapgeo_3D, np.ndarray) # .save(os.path.join(tests_path, 'test_output', 'resampled_3D_02_pyresample.bsq'))
# only validate number of bands (height and width are validated in 2D version
# fixed numbers may fail here due to float uncertainty errors self.assertIsInstance(data_mapgeo_3D, np.ndarray)
self.assertGreater(np.dot(*data_mapgeo_3D.shape[:2]), np.dot(*self.data_sensor_geo_3D.shape[:2])) # only validate number of bands (height and width are validated in 2D version
self.assertEqual(data_mapgeo_3D.shape[2], 2) # fixed numbers may fail here due to float uncertainty errors
xmin, xmax, ymin, ymax = corner_coord_to_minmax(get_corner_coordinates(gt=dem_gt, self.assertGreater(np.dot(*data_mapgeo_3D.shape[:2]), np.dot(*self.data_sensor_geo_3D.shape[:2]))
cols=data_mapgeo_3D.shape[1], self.assertEqual(data_mapgeo_3D.shape[2], 2)
rows=data_mapgeo_3D.shape[0])) xmin, xmax, ymin, ymax = corner_coord_to_minmax(get_corner_coordinates(gt=dem_gt,
self.assertTrue(False not in np.isclose(np.array([xmin, ymin, xmax, ymax]), cols=data_mapgeo_3D.shape[1],
np.array(self.expected_dem_area_extent_utm_ongrid))) rows=data_mapgeo_3D.shape[0]))
self.assertTrue(False not in np.isclose(np.array([xmin, ymin, xmax, ymax]),
self.assertTrue(np.isclose(np.mean(data_mapgeo_3D[data_mapgeo_3D != 0]), np.array(self.expected_dem_area_extent_utm_ongrid)))
np.mean(self.data_sensor_geo_3D),
rtol=0.01)) self.assertTrue(np.isclose(np.mean(data_mapgeo_3D[data_mapgeo_3D != 0]),
self.assertEqual(self.data_sensor_geo_3D.dtype, data_mapgeo_3D.dtype) np.mean(self.data_sensor_geo_3D),
rtol=0.01))
self.assertEqual(self.data_sensor_geo_3D.dtype, data_mapgeo_3D.dtype)
def test_to_sensor_geometry(self): def test_to_sensor_geometry(self):
for rsp_alg in rsp_algs: for rsp_alg in rsp_algs:
SMGT = SensorMapGeometryTransformer3D(lons=self.lons_3D, for mp_alg in mp_algs:
lats=self.lats_3D, SMGT = SensorMapGeometryTransformer3D(lons=self.lons_3D,
resamp_alg=rsp_alg, lats=self.lats_3D,
) resamp_alg=rsp_alg,
dem_sensors_geo = SMGT.to_sensor_geometry(self.data_map_geo_3D, mp_alg=mp_alg
src_prj=32632, src_extent=self.dem_area_extent_coarse_subset_utm) )
self.assertIsInstance(dem_sensors_geo, np.ndarray) dem_sensors_geo = SMGT.to_sensor_geometry(self.data_map_geo_3D,
self.assertEqual(dem_sensors_geo.shape, (150, 1000, 2)) src_prj=32632,
self.assertEqual(self.data_map_geo_3D.dtype, dem_sensors_geo.dtype) src_extent=self.dem_area_extent_coarse_subset_utm)
self.assertIsInstance(dem_sensors_geo, np.ndarray)
self.assertEqual(dem_sensors_geo.shape, (150, 1000, 2))
self.assertEqual(self.data_map_geo_3D.dtype, dem_sensors_geo.dtype)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment