diff --git a/HISTORY.rst b/HISTORY.rst index 725c6cd1ff0ba76f0a4b45fc2544d6e7a741595f..ef611641aea71d31a5559da80cf24a05169dfc20 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -2,6 +2,14 @@ History ======= +0.4.0 (2020-08-07) +------------------ + +* Revised the way how multiprocessing is called in the 3D transformer (replaced with pool.imap_unordered without + initializer). This is as fast as before but has a much smaller memory consumption enabling the algorithm to also run + on smaller machines while still highly benefiting from more CPUs. Fixes issue #5. + + 0.3.5 (2020-08-07) ------------------ diff --git a/sensormapgeo/transformer_3d.py b/sensormapgeo/transformer_3d.py index b0076d526176a3a16e87d7bc2ef07f0c42a05243..10f539b95bc6346c6212e45626ce2656c1c47fea 100644 --- a/sensormapgeo/transformer_3d.py +++ b/sensormapgeo/transformer_3d.py @@ -96,19 +96,19 @@ class SensorMapGeometryTransformer3D(object): # bands: multiprocessing uses multiprocessing.Pool, implemented in to_map_geometry / to_sensor_geometry # tiles: multiprocessing uses OpenMP implemented in pykdtree which is used by pyresample self.opts['nprocs'] = opts.get('nprocs', multiprocessing.cpu_count()) + if self.opts['nprocs'] > multiprocessing.cpu_count(): + self.opts['nprocs'] = multiprocessing.cpu_count() self.mp_alg = ('bands' if self.lons.shape[2] >= opts['nprocs'] else 'tiles') if mp_alg == 'auto' else mp_alg @staticmethod def _to_map_geometry_2D(kwargs_dict: dict ) -> Tuple[np.ndarray, tuple, str, int]: - assert [var is not None for var in (_global_shared_lons, _global_shared_lats, _global_shared_data)] - - SMGT2D = SensorMapGeometryTransformer(lons=_global_shared_lons[:, :, kwargs_dict['band_idx']], - lats=_global_shared_lats[:, :, kwargs_dict['band_idx']], + SMGT2D = SensorMapGeometryTransformer(lons=kwargs_dict['lons'], + lats=kwargs_dict['lats'], resamp_alg=kwargs_dict['resamp_alg'], radius_of_influence=kwargs_dict['radius_of_influence'], **kwargs_dict['init_opts']) - data_mapgeo, out_gt, out_prj = SMGT2D.to_map_geometry(data=_global_shared_data[:, :, kwargs_dict['band_idx']], + data_mapgeo, out_gt, out_prj = SMGT2D.to_map_geometry(data=kwargs_dict['data'], area_definition=kwargs_dict['area_definition']) return data_mapgeo, out_gt, out_prj, kwargs_dict['band_idx'] @@ -214,16 +214,20 @@ class SensorMapGeometryTransformer3D(object): radius_of_influence=self.radius_of_influence, init_opts=init_opts, area_definition=area_definition, - band_idx=band + band_idx=band, + lons=self.lons[:, :, band], + lats=self.lats[:, :, band], + data=data[:, :, band], ) for band in range(data.shape[2])] if self.opts['nprocs'] > 1 and self.mp_alg == 'bands': - with multiprocessing.Pool(self.opts['nprocs'], - initializer=_initializer, - initargs=(self.lats, self.lons, data)) as pool: - result = pool.map(self._to_map_geometry_2D, args) + # NOTE: We use imap here as it directly returns the results when available (works like a generator). + # This saves a lot of memory compared with map. We also don't use an initializer to share the input + # arrays because this would allocate the memory for the input arrays of all bands at once. + with multiprocessing.Pool(self.opts['nprocs']) as pool: + result = [res for res in pool.imap_unordered(self._to_map_geometry_2D, args)] + else: - _initializer(self.lats, self.lons, data) result = [self._to_map_geometry_2D(argsdict) for argsdict in args] band_inds = [res[-1] for res in result] @@ -236,14 +240,12 @@ class SensorMapGeometryTransformer3D(object): @staticmethod def _to_sensor_geometry_2D(kwargs_dict: dict ) -> (np.ndarray, int): - assert [var is not None for var in (_global_shared_lons, _global_shared_lats, _global_shared_data)] - - SMGT2D = SensorMapGeometryTransformer(lons=_global_shared_lons[:, :, kwargs_dict['band_idx']], - lats=_global_shared_lats[:, :, kwargs_dict['band_idx']], + SMGT2D = SensorMapGeometryTransformer(lons=kwargs_dict['lons'], + lats=kwargs_dict['lats'], resamp_alg=kwargs_dict['resamp_alg'], radius_of_influence=kwargs_dict['radius_of_influence'], **kwargs_dict['init_opts']) - data_sensorgeo = SMGT2D.to_sensor_geometry(data=_global_shared_data[:, :, kwargs_dict['band_idx']], + data_sensorgeo = SMGT2D.to_sensor_geometry(data=kwargs_dict['data'], src_prj=kwargs_dict['src_prj'], src_extent=kwargs_dict['src_extent']) @@ -273,37 +275,21 @@ class SensorMapGeometryTransformer3D(object): init_opts=init_opts, src_prj=src_prj, src_extent=src_extent, - band_idx=band + band_idx=band, + lons=self.lons[:, :, band], + lats=self.lats[:, :, band], + data=data[:, :, band], + ) for band in range(data.shape[2])] if self.opts['nprocs'] > 1 and self.mp_alg == 'bands': - with multiprocessing.Pool(self.opts['nprocs'], - initializer=_initializer, - initargs=(self.lats, self.lons, data)) as pool: - result = pool.map(self._to_sensor_geometry_2D, args) + # NOTE: See the comments in the to_map_geometry() method. + with multiprocessing.Pool(self.opts['nprocs']) as pool: + result = [res for res in pool.imap_unordered(self._to_sensor_geometry_2D, args)] else: - _initializer(self.lats, self.lons, data) result = [self._to_sensor_geometry_2D(argsdict) for argsdict in args] band_inds = list(np.array(result)[:, -1]) data_sensorgeo = np.dstack([result[band_inds.index(i)][0] for i in range(data.shape[2])]) return data_sensorgeo - - -_global_shared_lats = None -_global_shared_lons = None -_global_shared_data = None - - -def _initializer(lats, lons, data): - """Declare global variables needed for SensorMapGeometryTransformer3D.to_map_geometry and to_sensor_geometry. - - :param lats: - :param lons: - :param data: - """ - global _global_shared_lats, _global_shared_lons, _global_shared_data - _global_shared_lats = lats - _global_shared_lons = lons - _global_shared_data = data