diff --git a/sensormapgeo/transformer_2d.py b/sensormapgeo/transformer_2d.py index 9a1aefb6b91b6612bd181315aa1d0f6c5fe70b25..140540ce1f1be24851c6e76f08ce1e7b5793e0c8 100644 --- a/sensormapgeo/transformer_2d.py +++ b/sensormapgeo/transformer_2d.py @@ -172,7 +172,8 @@ class SensorMapGeometryTransformer(object): drv_vrt = gdal.GetDriverByName("VRT") # noinspection PyUnusedLocal vrt = raiseErr_if_empty(drv_vrt.CreateCopy(path_xycoords_vrt, ds_xy_coords)) - del ds_xy_coords, vrt + ds_xy_coords = None + vrt = None # create VRT for one data band mask_band = np.ones((data.shape[:2]), np.int32) @@ -190,9 +191,10 @@ class SensorMapGeometryTransformer(object): "SRS": EPSG2WKT(tgt_epsg), }, "GEOLOCATION") vrt.FlushCache() - del ds_data, vrt + ds_data = None + vrt = None - subcall_with_output("gdalwarp '%s' '%s' " + output, exitcode, err = subcall_with_output("gdalwarp '%s' '%s' " '-geoloc ' '-t_srs EPSG:%d ' '-srcnodata 0 ' @@ -207,6 +209,10 @@ class SensorMapGeometryTransformer(object): ' -tr %s %s' % tgt_res if tgt_res else '',), v=True) + if exitcode: + print('ERROR!!', err.decode('latin-1')) + raise RuntimeError(err.decode('latin-1')) + # get output X/Y size ds_out = raiseErr_if_empty(gdal.Open(path_data_out)) diff --git a/sensormapgeo/transformer_3d.py b/sensormapgeo/transformer_3d.py index 5422fb0effc05683353024341c93de8f81b07b62..d2c323923625c5b4f104ac2b96aaff6ac7c6649d 100644 --- a/sensormapgeo/transformer_3d.py +++ b/sensormapgeo/transformer_3d.py @@ -97,12 +97,12 @@ class SensorMapGeometryTransformer3D(object): self.opts['nprocs'] = opts.get('nprocs', multiprocessing.cpu_count()) self.mp_alg = ('bands' if self.lons.shape[2] >= opts['nprocs'] else 'tiles') if mp_alg == 'auto' else mp_alg - # override self.mp_alg if SensorMapGeometryTransformer3D is called by nosetest or unittest - is_called_by_nose_cmd = 'nosetest' in sys.argv[0] - if self.opts['nprocs'] > 1 and self.mp_alg == 'bands' and is_called_by_nose_cmd: - warnings.warn("mp_alg='bands' causes deadlocks if SensorMapGeometryTransformer3D is called within a " - "nosetest console call. Using mp_alg='tiles'.") - self.mp_alg = 'tiles' + # # override self.mp_alg if SensorMapGeometryTransformer3D is called by nosetest or unittest + # is_called_by_nose_cmd = 'nosetest' in sys.argv[0] + # if self.opts['nprocs'] > 1 and self.mp_alg == 'bands' and is_called_by_nose_cmd: + # warnings.warn("mp_alg='bands' causes deadlocks if SensorMapGeometryTransformer3D is called within a " + # "nosetest console call. Using mp_alg='tiles'.") + # self.mp_alg = 'tiles' @staticmethod def _to_map_geometry_2D(kwargs_dict: dict) -> Tuple[np.ndarray, tuple, str, int]: @@ -113,12 +113,37 @@ class SensorMapGeometryTransformer3D(object): resamp_alg=kwargs_dict['resamp_alg'], radius_of_influence=kwargs_dict['radius_of_influence'], **kwargs_dict['init_opts']) - data_mapgeo, out_gt, out_prj = SMGT2D.to_map_geometry(data=_global_shared_data[:, :, kwargs_dict['band_idx']], - tgt_prj=kwargs_dict['tgt_prj'], - tgt_extent=kwargs_dict['tgt_extent'], - tgt_res=kwargs_dict['tgt_res']) - - return data_mapgeo, out_gt, out_prj, kwargs_dict['band_idx'] + try: + data_mapgeo, out_gt, out_prj = SMGT2D.to_map_geometry(data=_global_shared_data[:, :, kwargs_dict['band_idx']], + tgt_prj=kwargs_dict['tgt_prj'], + tgt_extent=kwargs_dict['tgt_extent'], + tgt_res=kwargs_dict['tgt_res']) + except: + print('CAUGHT %s' % kwargs_dict['band_idx']) + try: + data_mapgeo, out_gt, out_prj = SMGT2D.to_map_geometry(data=_global_shared_data[:, :, kwargs_dict['band_idx']], + tgt_prj=kwargs_dict['tgt_prj'], + tgt_extent=kwargs_dict['tgt_extent'], + tgt_res=kwargs_dict['tgt_res']) + except: + print('TWICE %s' % kwargs_dict['band_idx']) + try: + data_mapgeo, out_gt, out_prj = SMGT2D.to_map_geometry( + data=_global_shared_data[:, :, kwargs_dict['band_idx']], + tgt_prj=kwargs_dict['tgt_prj'], + tgt_extent=kwargs_dict['tgt_extent'], + tgt_res=kwargs_dict['tgt_res']) + except: + print('3 TIMES %s' % kwargs_dict['band_idx']) + + import dill + import os + with open(os.path.join('/home/gfz-fe/scheffler/temp/EnPT/', 'band%s' % kwargs_dict['band_idx']), 'wb') as outF: + dill.dump((data_mapgeo, out_gt, out_prj, kwargs_dict['band_idx']), outF) + + print(_global_shared_data.dtype, data_mapgeo.shape, data_mapgeo.dtype) + + # return data_mapgeo, out_gt, out_prj, kwargs_dict['band_idx'] def _get_common_target_extent(self, tgt_epsg): corner_coords_ll = [[self.lons[0, 0, :].min(), self.lats[0, 0, :].max()], # common UL_xy @@ -130,6 +155,18 @@ class SensorMapGeometryTransformer3D(object): return tgt_extent + def _compute_common_areadefinition_sensor2map(self): + # data: np.ndarray, + # tgt_prj: Union[int, str], + # tgt_extent: Tuple[float, float, float, float] = None, + # tgt_res: Tuple[float, float] = None) -> AreaDefinition: + SMGT2D = SensorMapGeometryTransformer( + lons=_global_shared_lons[:, :, kwargs_dict['band_idx']], + lats=_global_shared_lats[:, :, kwargs_dict['band_idx']], + resamp_alg=kwargs_dict['resamp_alg'], + radius_of_influence=kwargs_dict['radius_of_influence'], + **kwargs_dict['init_opts']) + def to_map_geometry(self, data: np.ndarray, tgt_prj: Union[str, int], @@ -153,6 +190,12 @@ class SensorMapGeometryTransformer3D(object): tgt_epsg = WKT2EPSG(proj4_to_WKT(get_proj4info(proj=tgt_prj))) tgt_extent = tgt_extent or self._get_common_target_extent(tgt_epsg) + # def compute_areadefinition_sensor2map(self, + # data: np.ndarray, + # tgt_prj: Union[int, str], + # tgt_extent: Tuple[float, float, float, float] = None, + # tgt_res: Tuple[float, float] = None) -> AreaDefinition: + init_opts = self.opts.copy() if self.mp_alg == 'bands': del init_opts['nprocs'] # avoid sub-multiprocessing @@ -167,11 +210,42 @@ class SensorMapGeometryTransformer3D(object): band_idx=band ) for band in range(data.shape[2])] + print(self.mp_alg) + # self.opts['nprocs'] = 4 # FIXME if self.opts['nprocs'] > 1 and self.mp_alg == 'bands': with multiprocessing.Pool(self.opts['nprocs'], initializer=_initializer, initargs=(self.lats, self.lons, data)) as pool: - result = pool.map(self._to_map_geometry_2D, args) + # result = pool.map(self._to_map_geometry_2D, args) + + result = pool.map_async(self._to_map_geometry_2D, args, chunksize=1) + while True: + if result.ready(): + result = result.get() + break + + # result = list(pool.imap_unordered(self._to_map_geometry_2D, args)) + + # _initializer(self.lats, self.lons, data) + # + # try: + # self.lats, self.lons = None, None + # + # with multiprocessing.Pool(self.opts['nprocs']) as pool: + # result = pool.map(self._to_map_geometry_2D, args) + # finally: + # self.lats, self.lons = _global_shared_lats, _global_shared_lons + # # _global_shared_lats, _global_shared_lons, _global_shared_data = None, None, None + # # global _global_shared_lats, _global_shared_lons, _global_shared_data + + import dill + import os + result = [] + for i in range(data.shape[2]): + print('loading band %s' %i) + with open(os.path.join('/home/gfz-fe/scheffler/temp/EnPT/', 'band%s' % i), 'rb') as inF: + result.append(dill.load(inF)) + else: _initializer(self.lats, self.lons, data) result = [self._to_map_geometry_2D(argsdict) for argsdict in args]