Commit ac380916 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

First draft to avoid deadlock caused by an incompatible build of OpenMP.


Signed-off-by: Daniel Scheffler's avatarDaniel Scheffler <danschef@gfz-potsdam.de>
parent 55a65077
Pipeline #12316 failed with stage
in 33 seconds
...@@ -100,9 +100,8 @@ class SensorMapGeometryTransformer(object): ...@@ -100,9 +100,8 @@ class SensorMapGeometryTransformer(object):
# NOTE: If pykdtree is built with OpenMP support (default) the number of threads is controlled with the # NOTE: If pykdtree is built with OpenMP support (default) the number of threads is controlled with the
# standard OpenMP environment variable OMP_NUM_THREADS. The nprocs argument has no effect on pykdtree. # standard OpenMP environment variable OMP_NUM_THREADS. The nprocs argument has no effect on pykdtree.
os.environ['OMP_NUM_THREADS'] = '%d' % (self.opts['nprocs'] if 'nprocs' in self.opts else 1)
if 'nprocs' in self.opts: if 'nprocs' in self.opts:
if self.opts['nprocs'] > 1:
os.environ['OMP_NUM_THREADS'] = '%d' % opts['nprocs']
del self.opts['nprocs'] del self.opts['nprocs']
self.lats = lats self.lats = lats
......
...@@ -26,7 +26,12 @@ ...@@ -26,7 +26,12 @@
from typing import Union, Tuple from typing import Union, Tuple
import multiprocessing import multiprocessing
from timeout_decorator import timeout
from concurrent.futures import TimeoutError as Timeout
from warnings import warn
import platform
from pebble import ProcessPool, ProcessExpired
import numpy as np import numpy as np
from pyproj import CRS from pyproj import CRS
from py_tools_ds.geo.coord_trafo import transform_any_prj from py_tools_ds.geo.coord_trafo import transform_any_prj
...@@ -224,8 +229,43 @@ class SensorMapGeometryTransformer3D(object): ...@@ -224,8 +229,43 @@ class SensorMapGeometryTransformer3D(object):
# NOTE: We use imap here as it directly returns the results when available (works like a generator). # NOTE: We use imap here as it directly returns the results when available (works like a generator).
# This saves a lot of memory compared with map. We also don't use an initializer to share the input # This saves a lot of memory compared with map. We also don't use an initializer to share the input
# arrays because this would allocate the memory for the input arrays of all bands at once. # arrays because this would allocate the memory for the input arrays of all bands at once.
with multiprocessing.Pool(self.opts['nprocs']) as pool: try:
result = [res for res in pool.imap_unordered(self._to_map_geometry_2D, args)] with ProcessPool() as pool:
future = pool.map(self._to_map_geometry_2D, args, timeout=10)
result = [i for i in future.result()]
except (Timeout, ProcessExpired):
msg = "Switched multiprocessing algorithm from 'bands' to 'tiles' due to a timeout in 'bands' mode. "
if platform.system() == 'Linux':
msg += "Consider using the LLVM instead of the GNU build of OpenMP to fix this issue (install, " \
"e.g., by 'conda install -c conda-forge _openmp_mutex=*=1_llvm'."
warn(msg, RuntimeWarning)
init_opts = self.opts.copy()
for argset in args:
argset.update(dict(init_opts=init_opts))
result = [self._to_map_geometry_2D(argsdict) for argsdict in args]
# from time import time
# t0 = time()
# self._to_map_geometry_2D(args[0])
# print(args[0]['data'].shape, time() - t0)
#
# ncols, nbands = data.shape[0], data.shape[2]
# nprocs = self.opts['nprocs'] if nbands > self.opts['nprocs'] else nbands
# max_seconds = None if self.mp_alg == 'tiles' else \
# (0.002 * ncols * nbands / nprocs * 10)
# print('timeout=%s' % max_seconds)
# @timeout(seconds=max_seconds, timeout_exception=TimeoutError)
# def run():
# with multiprocessing.Pool(self.opts['nprocs']) as pool:
# return [res for res in pool.imap_unordered(self._to_map_geometry_2D, args)]
#
# try:
# result = run()
# except TimeoutError:
# print('CATCHED!')
else: else:
result = [self._to_map_geometry_2D(argsdict) for argsdict in args] result = [self._to_map_geometry_2D(argsdict) for argsdict in args]
......
...@@ -39,6 +39,7 @@ from sensormapgeo import SensorMapGeometryTransformer, SensorMapGeometryTransfor ...@@ -39,6 +39,7 @@ from sensormapgeo import SensorMapGeometryTransformer, SensorMapGeometryTransfor
tests_path = os.path.abspath(os.path.join(__path__[0], "..", "tests")) tests_path = os.path.abspath(os.path.join(__path__[0], "..", "tests"))
rsp_algs = ['nearest', 'bilinear', 'gauss'] rsp_algs = ['nearest', 'bilinear', 'gauss']
mp_algs = ['bands', 'tiles']
class Test_SensorMapGeometryTransformer(TestCase): class Test_SensorMapGeometryTransformer(TestCase):
...@@ -194,10 +195,12 @@ class Test_SensorMapGeometryTransformer3D(TestCase): ...@@ -194,10 +195,12 @@ class Test_SensorMapGeometryTransformer3D(TestCase):
def test_to_map_geometry_lonlat_3D_geolayer(self): def test_to_map_geometry_lonlat_3D_geolayer(self):
for rsp_alg in rsp_algs: for rsp_alg in rsp_algs:
for mp_alg in mp_algs:
SMGT = SensorMapGeometryTransformer3D(lons=self.lons_3D, SMGT = SensorMapGeometryTransformer3D(lons=self.lons_3D,
lats=self.lats_3D, lats=self.lats_3D,
# resamp_alg='nearest', # resamp_alg='nearest',
resamp_alg=rsp_alg, resamp_alg=rsp_alg,
mp_alg=mp_alg
) )
# to Lon/Lat # to Lon/Lat
...@@ -223,12 +226,45 @@ class Test_SensorMapGeometryTransformer3D(TestCase): ...@@ -223,12 +226,45 @@ class Test_SensorMapGeometryTransformer3D(TestCase):
rtol=0.01)) rtol=0.01))
self.assertEqual(self.data_sensor_geo_3D.dtype, data_mapgeo_3D.dtype) self.assertEqual(self.data_sensor_geo_3D.dtype, data_mapgeo_3D.dtype)
# def test_to_map_geometry_lonlat_3D_geolayer_bands(self):
# SMGT = SensorMapGeometryTransformer3D(lons=self.lons_3D,
# lats=self.lats_3D,
# # resamp_alg='nearest',
# resamp_alg='gauss',
# mp_alg='bands'
# )
#
# # to Lon/Lat
# data_mapgeo_3D, dem_gt, dem_prj = SMGT.to_map_geometry(self.data_sensor_geo_3D, tgt_prj=4326)
#
# # from geoarray import GeoArray
# # GeoArray(data_mapgeo_3D, dem_gt, dem_prj)\
# # .save(os.path.join(tests_path, 'test_output', 'resampled_3D_02_ll.bsq'))
#
# self.assertIsInstance(data_mapgeo_3D, np.ndarray)
# # only validate number of bands (height and width are validated in 2D version
# # fixed numbers may fail here due to float uncertainty errors
# self.assertGreater(np.dot(*data_mapgeo_3D.shape[:2]), np.dot(*self.data_sensor_geo_3D.shape[:2]))
# self.assertEqual(data_mapgeo_3D.shape[2], 2)
# xmin, xmax, ymin, ymax = corner_coord_to_minmax(get_corner_coordinates(gt=dem_gt,
# cols=data_mapgeo_3D.shape[1],
# rows=data_mapgeo_3D.shape[0]))
# self.assertTrue(False not in np.isclose(np.array([xmin, ymin, xmax, ymax]),
# np.array(self.expected_dem_area_extent_lonlat)))
#
# self.assertTrue(np.isclose(np.mean(data_mapgeo_3D[data_mapgeo_3D != 0]),
# np.mean(self.data_sensor_geo_3D),
# rtol=0.01))
# self.assertEqual(self.data_sensor_geo_3D.dtype, data_mapgeo_3D.dtype)
def test_to_map_geometry_utm_3D_geolayer(self): def test_to_map_geometry_utm_3D_geolayer(self):
for rsp_alg in rsp_algs: for rsp_alg in rsp_algs:
for mp_alg in mp_algs:
SMGT = SensorMapGeometryTransformer3D(lons=self.lons_3D, SMGT = SensorMapGeometryTransformer3D(lons=self.lons_3D,
lats=self.lats_3D, lats=self.lats_3D,
# resamp_alg='nearest', # resamp_alg='nearest',
resamp_alg=rsp_alg, resamp_alg=rsp_alg,
mp_alg=mp_alg
) )
# to Lon/Lat # to Lon/Lat
...@@ -260,9 +296,11 @@ class Test_SensorMapGeometryTransformer3D(TestCase): ...@@ -260,9 +296,11 @@ class Test_SensorMapGeometryTransformer3D(TestCase):
def test_to_sensor_geometry(self): def test_to_sensor_geometry(self):
for rsp_alg in rsp_algs: for rsp_alg in rsp_algs:
for mp_alg in mp_algs:
SMGT = SensorMapGeometryTransformer3D(lons=self.lons_3D, SMGT = SensorMapGeometryTransformer3D(lons=self.lons_3D,
lats=self.lats_3D, lats=self.lats_3D,
resamp_alg=rsp_alg, resamp_alg=rsp_alg,
mp_alg=mp_alg
) )
dem_sensors_geo = SMGT.to_sensor_geometry(self.data_map_geo_3D, dem_sensors_geo = SMGT.to_sensor_geometry(self.data_map_geo_3D,
src_prj=32632, src_extent=self.dem_area_extent_coarse_subset_utm) src_prj=32632, src_extent=self.dem_area_extent_coarse_subset_utm)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment