Commit 5a437961 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

modified: sensormapgeo/transformer_2d.py

	modified:   sensormapgeo/transformer_3d.py
parent 84ab0604
Pipeline #8574 failed with stage
in 1 minute and 40 seconds
......@@ -172,7 +172,8 @@ class SensorMapGeometryTransformer(object):
drv_vrt = gdal.GetDriverByName("VRT")
# noinspection PyUnusedLocal
vrt = raiseErr_if_empty(drv_vrt.CreateCopy(path_xycoords_vrt, ds_xy_coords))
del ds_xy_coords, vrt
ds_xy_coords = None
vrt = None
# create VRT for one data band
mask_band = np.ones((data.shape[:2]), np.int32)
......@@ -190,9 +191,10 @@ class SensorMapGeometryTransformer(object):
"SRS": EPSG2WKT(tgt_epsg),
}, "GEOLOCATION")
vrt.FlushCache()
del ds_data, vrt
ds_data = None
vrt = None
subcall_with_output("gdalwarp '%s' '%s' "
output, exitcode, err = subcall_with_output("gdalwarp '%s' '%s' "
'-geoloc '
'-t_srs EPSG:%d '
'-srcnodata 0 '
......@@ -207,6 +209,10 @@ class SensorMapGeometryTransformer(object):
' -tr %s %s' % tgt_res if tgt_res else '',),
v=True)
if exitcode:
print('ERROR!!', err.decode('latin-1'))
raise RuntimeError(err.decode('latin-1'))
# get output X/Y size
ds_out = raiseErr_if_empty(gdal.Open(path_data_out))
......
......@@ -97,12 +97,12 @@ class SensorMapGeometryTransformer3D(object):
self.opts['nprocs'] = opts.get('nprocs', multiprocessing.cpu_count())
self.mp_alg = ('bands' if self.lons.shape[2] >= opts['nprocs'] else 'tiles') if mp_alg == 'auto' else mp_alg
# override self.mp_alg if SensorMapGeometryTransformer3D is called by nosetest or unittest
is_called_by_nose_cmd = 'nosetest' in sys.argv[0]
if self.opts['nprocs'] > 1 and self.mp_alg == 'bands' and is_called_by_nose_cmd:
warnings.warn("mp_alg='bands' causes deadlocks if SensorMapGeometryTransformer3D is called within a "
"nosetest console call. Using mp_alg='tiles'.")
self.mp_alg = 'tiles'
# # override self.mp_alg if SensorMapGeometryTransformer3D is called by nosetest or unittest
# is_called_by_nose_cmd = 'nosetest' in sys.argv[0]
# if self.opts['nprocs'] > 1 and self.mp_alg == 'bands' and is_called_by_nose_cmd:
# warnings.warn("mp_alg='bands' causes deadlocks if SensorMapGeometryTransformer3D is called within a "
# "nosetest console call. Using mp_alg='tiles'.")
# self.mp_alg = 'tiles'
@staticmethod
def _to_map_geometry_2D(kwargs_dict: dict) -> Tuple[np.ndarray, tuple, str, int]:
......@@ -113,12 +113,37 @@ class SensorMapGeometryTransformer3D(object):
resamp_alg=kwargs_dict['resamp_alg'],
radius_of_influence=kwargs_dict['radius_of_influence'],
**kwargs_dict['init_opts'])
try:
data_mapgeo, out_gt, out_prj = SMGT2D.to_map_geometry(data=_global_shared_data[:, :, kwargs_dict['band_idx']],
tgt_prj=kwargs_dict['tgt_prj'],
tgt_extent=kwargs_dict['tgt_extent'],
tgt_res=kwargs_dict['tgt_res'])
except:
print('CAUGHT %s' % kwargs_dict['band_idx'])
try:
data_mapgeo, out_gt, out_prj = SMGT2D.to_map_geometry(data=_global_shared_data[:, :, kwargs_dict['band_idx']],
tgt_prj=kwargs_dict['tgt_prj'],
tgt_extent=kwargs_dict['tgt_extent'],
tgt_res=kwargs_dict['tgt_res'])
except:
print('TWICE %s' % kwargs_dict['band_idx'])
try:
data_mapgeo, out_gt, out_prj = SMGT2D.to_map_geometry(
data=_global_shared_data[:, :, kwargs_dict['band_idx']],
tgt_prj=kwargs_dict['tgt_prj'],
tgt_extent=kwargs_dict['tgt_extent'],
tgt_res=kwargs_dict['tgt_res'])
except:
print('3 TIMES %s' % kwargs_dict['band_idx'])
import dill
import os
with open(os.path.join('/home/gfz-fe/scheffler/temp/EnPT/', 'band%s' % kwargs_dict['band_idx']), 'wb') as outF:
dill.dump((data_mapgeo, out_gt, out_prj, kwargs_dict['band_idx']), outF)
return data_mapgeo, out_gt, out_prj, kwargs_dict['band_idx']
print(_global_shared_data.dtype, data_mapgeo.shape, data_mapgeo.dtype)
# return data_mapgeo, out_gt, out_prj, kwargs_dict['band_idx']
def _get_common_target_extent(self, tgt_epsg):
corner_coords_ll = [[self.lons[0, 0, :].min(), self.lats[0, 0, :].max()], # common UL_xy
......@@ -130,6 +155,18 @@ class SensorMapGeometryTransformer3D(object):
return tgt_extent
def _compute_common_areadefinition_sensor2map(self):
# data: np.ndarray,
# tgt_prj: Union[int, str],
# tgt_extent: Tuple[float, float, float, float] = None,
# tgt_res: Tuple[float, float] = None) -> AreaDefinition:
SMGT2D = SensorMapGeometryTransformer(
lons=_global_shared_lons[:, :, kwargs_dict['band_idx']],
lats=_global_shared_lats[:, :, kwargs_dict['band_idx']],
resamp_alg=kwargs_dict['resamp_alg'],
radius_of_influence=kwargs_dict['radius_of_influence'],
**kwargs_dict['init_opts'])
def to_map_geometry(self,
data: np.ndarray,
tgt_prj: Union[str, int],
......@@ -153,6 +190,12 @@ class SensorMapGeometryTransformer3D(object):
tgt_epsg = WKT2EPSG(proj4_to_WKT(get_proj4info(proj=tgt_prj)))
tgt_extent = tgt_extent or self._get_common_target_extent(tgt_epsg)
# def compute_areadefinition_sensor2map(self,
# data: np.ndarray,
# tgt_prj: Union[int, str],
# tgt_extent: Tuple[float, float, float, float] = None,
# tgt_res: Tuple[float, float] = None) -> AreaDefinition:
init_opts = self.opts.copy()
if self.mp_alg == 'bands':
del init_opts['nprocs'] # avoid sub-multiprocessing
......@@ -167,11 +210,42 @@ class SensorMapGeometryTransformer3D(object):
band_idx=band
) for band in range(data.shape[2])]
print(self.mp_alg)
# self.opts['nprocs'] = 4 # FIXME
if self.opts['nprocs'] > 1 and self.mp_alg == 'bands':
with multiprocessing.Pool(self.opts['nprocs'],
initializer=_initializer,
initargs=(self.lats, self.lons, data)) as pool:
result = pool.map(self._to_map_geometry_2D, args)
# result = pool.map(self._to_map_geometry_2D, args)
result = pool.map_async(self._to_map_geometry_2D, args, chunksize=1)
while True:
if result.ready():
result = result.get()
break
# result = list(pool.imap_unordered(self._to_map_geometry_2D, args))
# _initializer(self.lats, self.lons, data)
#
# try:
# self.lats, self.lons = None, None
#
# with multiprocessing.Pool(self.opts['nprocs']) as pool:
# result = pool.map(self._to_map_geometry_2D, args)
# finally:
# self.lats, self.lons = _global_shared_lats, _global_shared_lons
# # _global_shared_lats, _global_shared_lons, _global_shared_data = None, None, None
# # global _global_shared_lats, _global_shared_lons, _global_shared_data
import dill
import os
result = []
for i in range(data.shape[2]):
print('loading band %s' %i)
with open(os.path.join('/home/gfz-fe/scheffler/temp/EnPT/', 'band%s' % i), 'rb') as inF:
result.append(dill.load(inF))
else:
_initializer(self.lats, self.lons, data)
result = [self._to_map_geometry_2D(argsdict) for argsdict in args]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment