Commit 0b65db60 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Completed implementation to avoid OpenMP deadlock. Added pebble to pip dependencies.


Signed-off-by: Daniel Scheffler's avatarDaniel Scheffler <danschef@gfz-potsdam.de>
parent ac380916
......@@ -26,7 +26,6 @@
from typing import Union, Tuple
import multiprocessing
from timeout_decorator import timeout
from concurrent.futures import TimeoutError as Timeout
from warnings import warn
import platform
......@@ -226,15 +225,20 @@ class SensorMapGeometryTransformer3D(object):
) for band in range(data.shape[2])]
if self.opts['nprocs'] > 1 and self.mp_alg == 'bands':
# NOTE: We use imap here as it directly returns the results when available (works like a generator).
# NOTE: The pebble ProcessPool directly returns the results when available (works like a generator).
# This saves a lot of memory compared with map. We also don't use an initializer to share the input
# arrays because this would allocate the memory for the input arrays of all bands at once.
# NOTE: Use a multiprocessing imap iterator here when the OpenMP is finally fixed in the pyresample side:
# with multiprocessing.Pool(self.opts['nprocs']) as pool:
# return [res for res in pool.imap_unordered(self._to_map_geometry_2D, args)]
try:
# this may cause a deadlock with the GNU OpenMP build, thus each WORKER has a timeout of 10 seconds
with ProcessPool() as pool:
future = pool.map(self._to_map_geometry_2D, args, timeout=10)
result = [i for i in future.result()]
except (Timeout, ProcessExpired):
# use mp_alg='tiles' instead which uses OpenMP under the hood
msg = "Switched multiprocessing algorithm from 'bands' to 'tiles' due to a timeout in 'bands' mode. "
if platform.system() == 'Linux':
msg += "Consider using the LLVM instead of the GNU build of OpenMP to fix this issue (install, " \
......@@ -247,26 +251,6 @@ class SensorMapGeometryTransformer3D(object):
result = [self._to_map_geometry_2D(argsdict) for argsdict in args]
# from time import time
# t0 = time()
# self._to_map_geometry_2D(args[0])
# print(args[0]['data'].shape, time() - t0)
#
# ncols, nbands = data.shape[0], data.shape[2]
# nprocs = self.opts['nprocs'] if nbands > self.opts['nprocs'] else nbands
# max_seconds = None if self.mp_alg == 'tiles' else \
# (0.002 * ncols * nbands / nprocs * 10)
# print('timeout=%s' % max_seconds)
# @timeout(seconds=max_seconds, timeout_exception=TimeoutError)
# def run():
# with multiprocessing.Pool(self.opts['nprocs']) as pool:
# return [res for res in pool.imap_unordered(self._to_map_geometry_2D, args)]
#
# try:
# result = run()
# except TimeoutError:
# print('CATCHED!')
else:
result = [self._to_map_geometry_2D(argsdict) for argsdict in args]
......
......@@ -37,7 +37,7 @@ version = {}
with open("sensormapgeo/version.py") as version_file:
exec(version_file.read(), version)
requirements = ['numpy', 'gdal', 'pyresample>=1.11.0', 'py_tools_ds>=0.14.26', 'pyproj>=2.2']
requirements = ['numpy', 'gdal', 'pyresample>=1.11.0', 'py_tools_ds>=0.14.26', 'pyproj>=2.2', 'pebble']
setup_requirements = []
......@@ -46,7 +46,7 @@ test_requirements = ['coverage', 'nose', 'nose-htmloutput', 'rednose']
setup(
author="Daniel Scheffler",
author_email='daniel.scheffler@gfz-potsdam.de',
python_requires='>=3.5',
python_requires='>=3.6',
classifiers=[
'Development Status :: 2 - Pre-Alpha',
'Intended Audience :: Developers',
......
......@@ -20,6 +20,8 @@ dependencies:
- pip:
- py_tools_ds>=0.14.26
- pebble
- sphinx-argparse
- flake8
- pycodestyle
......
......@@ -226,37 +226,6 @@ class Test_SensorMapGeometryTransformer3D(TestCase):
rtol=0.01))
self.assertEqual(self.data_sensor_geo_3D.dtype, data_mapgeo_3D.dtype)
# def test_to_map_geometry_lonlat_3D_geolayer_bands(self):
# SMGT = SensorMapGeometryTransformer3D(lons=self.lons_3D,
# lats=self.lats_3D,
# # resamp_alg='nearest',
# resamp_alg='gauss',
# mp_alg='bands'
# )
#
# # to Lon/Lat
# data_mapgeo_3D, dem_gt, dem_prj = SMGT.to_map_geometry(self.data_sensor_geo_3D, tgt_prj=4326)
#
# # from geoarray import GeoArray
# # GeoArray(data_mapgeo_3D, dem_gt, dem_prj)\
# # .save(os.path.join(tests_path, 'test_output', 'resampled_3D_02_ll.bsq'))
#
# self.assertIsInstance(data_mapgeo_3D, np.ndarray)
# # only validate number of bands (height and width are validated in 2D version
# # fixed numbers may fail here due to float uncertainty errors
# self.assertGreater(np.dot(*data_mapgeo_3D.shape[:2]), np.dot(*self.data_sensor_geo_3D.shape[:2]))
# self.assertEqual(data_mapgeo_3D.shape[2], 2)
# xmin, xmax, ymin, ymax = corner_coord_to_minmax(get_corner_coordinates(gt=dem_gt,
# cols=data_mapgeo_3D.shape[1],
# rows=data_mapgeo_3D.shape[0]))
# self.assertTrue(False not in np.isclose(np.array([xmin, ymin, xmax, ymax]),
# np.array(self.expected_dem_area_extent_lonlat)))
#
# self.assertTrue(np.isclose(np.mean(data_mapgeo_3D[data_mapgeo_3D != 0]),
# np.mean(self.data_sensor_geo_3D),
# rtol=0.01))
# self.assertEqual(self.data_sensor_geo_3D.dtype, data_mapgeo_3D.dtype)
def test_to_map_geometry_utm_3D_geolayer(self):
for rsp_alg in rsp_algs:
for mp_alg in mp_algs:
......@@ -303,7 +272,8 @@ class Test_SensorMapGeometryTransformer3D(TestCase):
mp_alg=mp_alg
)
dem_sensors_geo = SMGT.to_sensor_geometry(self.data_map_geo_3D,
src_prj=32632, src_extent=self.dem_area_extent_coarse_subset_utm)
src_prj=32632,
src_extent=self.dem_area_extent_coarse_subset_utm)
self.assertIsInstance(dem_sensors_geo, np.ndarray)
self.assertEqual(dem_sensors_geo.shape, (150, 1000, 2))
self.assertEqual(self.data_map_geo_3D.dtype, dem_sensors_geo.dtype)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment