Commit 9f85f922 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Merge branch 'enhancement/reduce_memory_consumption' into 'master'

Enhancement/reduce memory consumption

Closes #5

See merge request !4
parents effe1819 bb0ef7fa
Pipeline #11261 passed with stages
in 8 minutes and 30 seconds
...@@ -2,6 +2,14 @@ ...@@ -2,6 +2,14 @@
History History
======= =======
0.4.0 (2020-08-07)
------------------
* Revised the way how multiprocessing is called in the 3D transformer (replaced with pool.imap_unordered without
initializer). This is as fast as before but has a much smaller memory consumption enabling the algorithm to also run
on smaller machines while still highly benefiting from more CPUs. Fixes issue #5.
0.3.5 (2020-08-07) 0.3.5 (2020-08-07)
------------------ ------------------
......
...@@ -96,19 +96,19 @@ class SensorMapGeometryTransformer3D(object): ...@@ -96,19 +96,19 @@ class SensorMapGeometryTransformer3D(object):
# bands: multiprocessing uses multiprocessing.Pool, implemented in to_map_geometry / to_sensor_geometry # bands: multiprocessing uses multiprocessing.Pool, implemented in to_map_geometry / to_sensor_geometry
# tiles: multiprocessing uses OpenMP implemented in pykdtree which is used by pyresample # tiles: multiprocessing uses OpenMP implemented in pykdtree which is used by pyresample
self.opts['nprocs'] = opts.get('nprocs', multiprocessing.cpu_count()) self.opts['nprocs'] = opts.get('nprocs', multiprocessing.cpu_count())
if self.opts['nprocs'] > multiprocessing.cpu_count():
self.opts['nprocs'] = multiprocessing.cpu_count()
self.mp_alg = ('bands' if self.lons.shape[2] >= opts['nprocs'] else 'tiles') if mp_alg == 'auto' else mp_alg self.mp_alg = ('bands' if self.lons.shape[2] >= opts['nprocs'] else 'tiles') if mp_alg == 'auto' else mp_alg
@staticmethod @staticmethod
def _to_map_geometry_2D(kwargs_dict: dict def _to_map_geometry_2D(kwargs_dict: dict
) -> Tuple[np.ndarray, tuple, str, int]: ) -> Tuple[np.ndarray, tuple, str, int]:
assert [var is not None for var in (_global_shared_lons, _global_shared_lats, _global_shared_data)] SMGT2D = SensorMapGeometryTransformer(lons=kwargs_dict['lons'],
lats=kwargs_dict['lats'],
SMGT2D = SensorMapGeometryTransformer(lons=_global_shared_lons[:, :, kwargs_dict['band_idx']],
lats=_global_shared_lats[:, :, kwargs_dict['band_idx']],
resamp_alg=kwargs_dict['resamp_alg'], resamp_alg=kwargs_dict['resamp_alg'],
radius_of_influence=kwargs_dict['radius_of_influence'], radius_of_influence=kwargs_dict['radius_of_influence'],
**kwargs_dict['init_opts']) **kwargs_dict['init_opts'])
data_mapgeo, out_gt, out_prj = SMGT2D.to_map_geometry(data=_global_shared_data[:, :, kwargs_dict['band_idx']], data_mapgeo, out_gt, out_prj = SMGT2D.to_map_geometry(data=kwargs_dict['data'],
area_definition=kwargs_dict['area_definition']) area_definition=kwargs_dict['area_definition'])
return data_mapgeo, out_gt, out_prj, kwargs_dict['band_idx'] return data_mapgeo, out_gt, out_prj, kwargs_dict['band_idx']
...@@ -214,16 +214,20 @@ class SensorMapGeometryTransformer3D(object): ...@@ -214,16 +214,20 @@ class SensorMapGeometryTransformer3D(object):
radius_of_influence=self.radius_of_influence, radius_of_influence=self.radius_of_influence,
init_opts=init_opts, init_opts=init_opts,
area_definition=area_definition, area_definition=area_definition,
band_idx=band band_idx=band,
lons=self.lons[:, :, band],
lats=self.lats[:, :, band],
data=data[:, :, band],
) for band in range(data.shape[2])] ) for band in range(data.shape[2])]
if self.opts['nprocs'] > 1 and self.mp_alg == 'bands': if self.opts['nprocs'] > 1 and self.mp_alg == 'bands':
with multiprocessing.Pool(self.opts['nprocs'], # NOTE: We use imap here as it directly returns the results when available (works like a generator).
initializer=_initializer, # This saves a lot of memory compared with map. We also don't use an initializer to share the input
initargs=(self.lats, self.lons, data)) as pool: # arrays because this would allocate the memory for the input arrays of all bands at once.
result = pool.map(self._to_map_geometry_2D, args) with multiprocessing.Pool(self.opts['nprocs']) as pool:
result = [res for res in pool.imap_unordered(self._to_map_geometry_2D, args)]
else: else:
_initializer(self.lats, self.lons, data)
result = [self._to_map_geometry_2D(argsdict) for argsdict in args] result = [self._to_map_geometry_2D(argsdict) for argsdict in args]
band_inds = [res[-1] for res in result] band_inds = [res[-1] for res in result]
...@@ -236,14 +240,12 @@ class SensorMapGeometryTransformer3D(object): ...@@ -236,14 +240,12 @@ class SensorMapGeometryTransformer3D(object):
@staticmethod @staticmethod
def _to_sensor_geometry_2D(kwargs_dict: dict def _to_sensor_geometry_2D(kwargs_dict: dict
) -> (np.ndarray, int): ) -> (np.ndarray, int):
assert [var is not None for var in (_global_shared_lons, _global_shared_lats, _global_shared_data)] SMGT2D = SensorMapGeometryTransformer(lons=kwargs_dict['lons'],
lats=kwargs_dict['lats'],
SMGT2D = SensorMapGeometryTransformer(lons=_global_shared_lons[:, :, kwargs_dict['band_idx']],
lats=_global_shared_lats[:, :, kwargs_dict['band_idx']],
resamp_alg=kwargs_dict['resamp_alg'], resamp_alg=kwargs_dict['resamp_alg'],
radius_of_influence=kwargs_dict['radius_of_influence'], radius_of_influence=kwargs_dict['radius_of_influence'],
**kwargs_dict['init_opts']) **kwargs_dict['init_opts'])
data_sensorgeo = SMGT2D.to_sensor_geometry(data=_global_shared_data[:, :, kwargs_dict['band_idx']], data_sensorgeo = SMGT2D.to_sensor_geometry(data=kwargs_dict['data'],
src_prj=kwargs_dict['src_prj'], src_prj=kwargs_dict['src_prj'],
src_extent=kwargs_dict['src_extent']) src_extent=kwargs_dict['src_extent'])
...@@ -273,37 +275,21 @@ class SensorMapGeometryTransformer3D(object): ...@@ -273,37 +275,21 @@ class SensorMapGeometryTransformer3D(object):
init_opts=init_opts, init_opts=init_opts,
src_prj=src_prj, src_prj=src_prj,
src_extent=src_extent, src_extent=src_extent,
band_idx=band band_idx=band,
lons=self.lons[:, :, band],
lats=self.lats[:, :, band],
data=data[:, :, band],
) for band in range(data.shape[2])] ) for band in range(data.shape[2])]
if self.opts['nprocs'] > 1 and self.mp_alg == 'bands': if self.opts['nprocs'] > 1 and self.mp_alg == 'bands':
with multiprocessing.Pool(self.opts['nprocs'], # NOTE: See the comments in the to_map_geometry() method.
initializer=_initializer, with multiprocessing.Pool(self.opts['nprocs']) as pool:
initargs=(self.lats, self.lons, data)) as pool: result = [res for res in pool.imap_unordered(self._to_sensor_geometry_2D, args)]
result = pool.map(self._to_sensor_geometry_2D, args)
else: else:
_initializer(self.lats, self.lons, data)
result = [self._to_sensor_geometry_2D(argsdict) for argsdict in args] result = [self._to_sensor_geometry_2D(argsdict) for argsdict in args]
band_inds = list(np.array(result)[:, -1]) band_inds = list(np.array(result)[:, -1])
data_sensorgeo = np.dstack([result[band_inds.index(i)][0] for i in range(data.shape[2])]) data_sensorgeo = np.dstack([result[band_inds.index(i)][0] for i in range(data.shape[2])])
return data_sensorgeo return data_sensorgeo
_global_shared_lats = None
_global_shared_lons = None
_global_shared_data = None
def _initializer(lats, lons, data):
"""Declare global variables needed for SensorMapGeometryTransformer3D.to_map_geometry and to_sensor_geometry.
:param lats:
:param lons:
:param data:
"""
global _global_shared_lats, _global_shared_lons, _global_shared_data
_global_shared_lats = lats
_global_shared_lons = lons
_global_shared_data = data
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment