From 7110af862ee0aabeb3a12ae5d360860406cd3958 Mon Sep 17 00:00:00 2001 From: Daniel Scheffler Date: Fri, 7 Aug 2020 17:31:54 +0200 Subject: [PATCH 1/2] First part of the solution. Signed-off-by: Daniel Scheffler --- sensormapgeo/transformer_3d.py | 36 +++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/sensormapgeo/transformer_3d.py b/sensormapgeo/transformer_3d.py index b0076d5..f2ca161 100644 --- a/sensormapgeo/transformer_3d.py +++ b/sensormapgeo/transformer_3d.py @@ -96,19 +96,19 @@ class SensorMapGeometryTransformer3D(object): # bands: multiprocessing uses multiprocessing.Pool, implemented in to_map_geometry / to_sensor_geometry # tiles: multiprocessing uses OpenMP implemented in pykdtree which is used by pyresample self.opts['nprocs'] = opts.get('nprocs', multiprocessing.cpu_count()) + if self.opts['nprocs'] > multiprocessing.cpu_count(): + self.opts['nprocs'] = multiprocessing.cpu_count() self.mp_alg = ('bands' if self.lons.shape[2] >= opts['nprocs'] else 'tiles') if mp_alg == 'auto' else mp_alg @staticmethod def _to_map_geometry_2D(kwargs_dict: dict ) -> Tuple[np.ndarray, tuple, str, int]: - assert [var is not None for var in (_global_shared_lons, _global_shared_lats, _global_shared_data)] - - SMGT2D = SensorMapGeometryTransformer(lons=_global_shared_lons[:, :, kwargs_dict['band_idx']], - lats=_global_shared_lats[:, :, kwargs_dict['band_idx']], + SMGT2D = SensorMapGeometryTransformer(lons=kwargs_dict['lons'], + lats=kwargs_dict['lats'], resamp_alg=kwargs_dict['resamp_alg'], radius_of_influence=kwargs_dict['radius_of_influence'], **kwargs_dict['init_opts']) - data_mapgeo, out_gt, out_prj = SMGT2D.to_map_geometry(data=_global_shared_data[:, :, kwargs_dict['band_idx']], + data_mapgeo, out_gt, out_prj = SMGT2D.to_map_geometry(data=kwargs_dict['data'], area_definition=kwargs_dict['area_definition']) return data_mapgeo, out_gt, out_prj, kwargs_dict['band_idx'] @@ -214,16 +214,30 @@ class SensorMapGeometryTransformer3D(object): radius_of_influence=self.radius_of_influence, init_opts=init_opts, area_definition=area_definition, - band_idx=band + band_idx=band, + lons=self.lons[:, :, band], + lats=self.lats[:, :, band], + data=data[:, :, band], ) for band in range(data.shape[2])] if self.opts['nprocs'] > 1 and self.mp_alg == 'bands': - with multiprocessing.Pool(self.opts['nprocs'], - initializer=_initializer, - initargs=(self.lats, self.lons, data)) as pool: - result = pool.map(self._to_map_geometry_2D, args) + from time import time + t0 = time() + + # data_mapgeo_v2 = np.empty((area_definition.height, + # area_definition.width, + # data.shape[2]), + # dtype=data.dtype) + + with multiprocessing.Pool(self.opts['nprocs']) as pool: + result = [res for res in pool.imap_unordered(self._to_map_geometry_2D, args)] + + # for res in pool.imap_unordered(self._to_map_geometry_2D, args): + # data_mapgeo_v2[:, :, res[-1]] = res[0] + + print(time() - t0) + else: - _initializer(self.lats, self.lons, data) result = [self._to_map_geometry_2D(argsdict) for argsdict in args] band_inds = [res[-1] for res in result] -- GitLab From bb0ef7fae52c30700525ad9bf49783978cf799ae Mon Sep 17 00:00:00 2001 From: Daniel Scheffler Date: Fri, 7 Aug 2020 18:32:49 +0200 Subject: [PATCH 2/2] Revised the way how multiprocessing is called in the 3D transformer (replaced with pool.imap_unordered without initializer). This is as fast as before but has a much smaller memory consumption enabling the algorithm to also run on smaller machines while still highly benefiting from more CPUs. Fixes issue #5. Updated HISTORY.rst. Signed-off-by: Daniel Scheffler --- HISTORY.rst | 8 +++++ sensormapgeo/transformer_3d.py | 56 +++++++++------------------------- 2 files changed, 22 insertions(+), 42 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 725c6cd..ef61164 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -2,6 +2,14 @@ History ======= +0.4.0 (2020-08-07) +------------------ + +* Revised the way how multiprocessing is called in the 3D transformer (replaced with pool.imap_unordered without + initializer). This is as fast as before but has a much smaller memory consumption enabling the algorithm to also run + on smaller machines while still highly benefiting from more CPUs. Fixes issue #5. + + 0.3.5 (2020-08-07) ------------------ diff --git a/sensormapgeo/transformer_3d.py b/sensormapgeo/transformer_3d.py index f2ca161..10f539b 100644 --- a/sensormapgeo/transformer_3d.py +++ b/sensormapgeo/transformer_3d.py @@ -221,22 +221,12 @@ class SensorMapGeometryTransformer3D(object): ) for band in range(data.shape[2])] if self.opts['nprocs'] > 1 and self.mp_alg == 'bands': - from time import time - t0 = time() - - # data_mapgeo_v2 = np.empty((area_definition.height, - # area_definition.width, - # data.shape[2]), - # dtype=data.dtype) - + # NOTE: We use imap here as it directly returns the results when available (works like a generator). + # This saves a lot of memory compared with map. We also don't use an initializer to share the input + # arrays because this would allocate the memory for the input arrays of all bands at once. with multiprocessing.Pool(self.opts['nprocs']) as pool: result = [res for res in pool.imap_unordered(self._to_map_geometry_2D, args)] - # for res in pool.imap_unordered(self._to_map_geometry_2D, args): - # data_mapgeo_v2[:, :, res[-1]] = res[0] - - print(time() - t0) - else: result = [self._to_map_geometry_2D(argsdict) for argsdict in args] @@ -250,14 +240,12 @@ class SensorMapGeometryTransformer3D(object): @staticmethod def _to_sensor_geometry_2D(kwargs_dict: dict ) -> (np.ndarray, int): - assert [var is not None for var in (_global_shared_lons, _global_shared_lats, _global_shared_data)] - - SMGT2D = SensorMapGeometryTransformer(lons=_global_shared_lons[:, :, kwargs_dict['band_idx']], - lats=_global_shared_lats[:, :, kwargs_dict['band_idx']], + SMGT2D = SensorMapGeometryTransformer(lons=kwargs_dict['lons'], + lats=kwargs_dict['lats'], resamp_alg=kwargs_dict['resamp_alg'], radius_of_influence=kwargs_dict['radius_of_influence'], **kwargs_dict['init_opts']) - data_sensorgeo = SMGT2D.to_sensor_geometry(data=_global_shared_data[:, :, kwargs_dict['band_idx']], + data_sensorgeo = SMGT2D.to_sensor_geometry(data=kwargs_dict['data'], src_prj=kwargs_dict['src_prj'], src_extent=kwargs_dict['src_extent']) @@ -287,37 +275,21 @@ class SensorMapGeometryTransformer3D(object): init_opts=init_opts, src_prj=src_prj, src_extent=src_extent, - band_idx=band + band_idx=band, + lons=self.lons[:, :, band], + lats=self.lats[:, :, band], + data=data[:, :, band], + ) for band in range(data.shape[2])] if self.opts['nprocs'] > 1 and self.mp_alg == 'bands': - with multiprocessing.Pool(self.opts['nprocs'], - initializer=_initializer, - initargs=(self.lats, self.lons, data)) as pool: - result = pool.map(self._to_sensor_geometry_2D, args) + # NOTE: See the comments in the to_map_geometry() method. + with multiprocessing.Pool(self.opts['nprocs']) as pool: + result = [res for res in pool.imap_unordered(self._to_sensor_geometry_2D, args)] else: - _initializer(self.lats, self.lons, data) result = [self._to_sensor_geometry_2D(argsdict) for argsdict in args] band_inds = list(np.array(result)[:, -1]) data_sensorgeo = np.dstack([result[band_inds.index(i)][0] for i in range(data.shape[2])]) return data_sensorgeo - - -_global_shared_lats = None -_global_shared_lons = None -_global_shared_data = None - - -def _initializer(lats, lons, data): - """Declare global variables needed for SensorMapGeometryTransformer3D.to_map_geometry and to_sensor_geometry. - - :param lats: - :param lons: - :param data: - """ - global _global_shared_lats, _global_shared_lons, _global_shared_data - _global_shared_lats = lats - _global_shared_lons = lons - _global_shared_data = data -- GitLab