Commit 7110af86 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

First part of the solution.


Signed-off-by: Daniel Scheffler's avatarDaniel Scheffler <danschef@gfz-potsdam.de>
parent effe1819
Pipeline #11259 passed with stage
in 1 minute and 43 seconds
......@@ -96,19 +96,19 @@ class SensorMapGeometryTransformer3D(object):
# bands: multiprocessing uses multiprocessing.Pool, implemented in to_map_geometry / to_sensor_geometry
# tiles: multiprocessing uses OpenMP implemented in pykdtree which is used by pyresample
self.opts['nprocs'] = opts.get('nprocs', multiprocessing.cpu_count())
if self.opts['nprocs'] > multiprocessing.cpu_count():
self.opts['nprocs'] = multiprocessing.cpu_count()
self.mp_alg = ('bands' if self.lons.shape[2] >= opts['nprocs'] else 'tiles') if mp_alg == 'auto' else mp_alg
@staticmethod
def _to_map_geometry_2D(kwargs_dict: dict
) -> Tuple[np.ndarray, tuple, str, int]:
assert [var is not None for var in (_global_shared_lons, _global_shared_lats, _global_shared_data)]
SMGT2D = SensorMapGeometryTransformer(lons=_global_shared_lons[:, :, kwargs_dict['band_idx']],
lats=_global_shared_lats[:, :, kwargs_dict['band_idx']],
SMGT2D = SensorMapGeometryTransformer(lons=kwargs_dict['lons'],
lats=kwargs_dict['lats'],
resamp_alg=kwargs_dict['resamp_alg'],
radius_of_influence=kwargs_dict['radius_of_influence'],
**kwargs_dict['init_opts'])
data_mapgeo, out_gt, out_prj = SMGT2D.to_map_geometry(data=_global_shared_data[:, :, kwargs_dict['band_idx']],
data_mapgeo, out_gt, out_prj = SMGT2D.to_map_geometry(data=kwargs_dict['data'],
area_definition=kwargs_dict['area_definition'])
return data_mapgeo, out_gt, out_prj, kwargs_dict['band_idx']
......@@ -214,16 +214,30 @@ class SensorMapGeometryTransformer3D(object):
radius_of_influence=self.radius_of_influence,
init_opts=init_opts,
area_definition=area_definition,
band_idx=band
band_idx=band,
lons=self.lons[:, :, band],
lats=self.lats[:, :, band],
data=data[:, :, band],
) for band in range(data.shape[2])]
if self.opts['nprocs'] > 1 and self.mp_alg == 'bands':
with multiprocessing.Pool(self.opts['nprocs'],
initializer=_initializer,
initargs=(self.lats, self.lons, data)) as pool:
result = pool.map(self._to_map_geometry_2D, args)
from time import time
t0 = time()
# data_mapgeo_v2 = np.empty((area_definition.height,
# area_definition.width,
# data.shape[2]),
# dtype=data.dtype)
with multiprocessing.Pool(self.opts['nprocs']) as pool:
result = [res for res in pool.imap_unordered(self._to_map_geometry_2D, args)]
# for res in pool.imap_unordered(self._to_map_geometry_2D, args):
# data_mapgeo_v2[:, :, res[-1]] = res[0]
print(time() - t0)
else:
_initializer(self.lats, self.lons, data)
result = [self._to_map_geometry_2D(argsdict) for argsdict in args]
band_inds = [res[-1] for res in result]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment