diff --git a/enpt/io/reader.py b/enpt/io/reader.py index 7c598145839f5aaaa63c8141dacd23334f21e968..e3a6ce2dd9f1badfa8f5d925f371d042dea4fef2 100644 --- a/enpt/io/reader.py +++ b/enpt/io/reader.py @@ -16,8 +16,8 @@ class L1B_Reader(object): self.logger = logger or logging.getLogger(__name__) self.cfg = user_inputs - @staticmethod - def read_inputdata(root_dir, observation_time: datetime, lon_lat_smpl=(15, 15), nsmile_coef=5): + def read_inputdata(self, root_dir, observation_time: datetime, lon_lat_smpl=(15, 15), nsmile_coef=5, + snr_vnir=None, snr_swir=None): # TODO move to init? """Read L1B, DEM and spatial reference data. @@ -50,8 +50,12 @@ class L1B_Reader(object): # compute radiance and calculate snr L1_obj.DN2TOARadiance() - L1_obj.vnir.detector_meta.calc_snr(data=L1_obj.vnir.data) - L1_obj.swir.detector_meta.calc_snr(data=L1_obj.swir.data) + if snr_vnir is not None: + self.logger.info("Compute SNR for vnir: %s" % snr_vnir) + L1_obj.vnir.detector_meta.calc_snr_vnir(detector=L1_obj.vnir, snr_data_fn=snr_vnir) + if snr_swir is not None: + self.logger.info("Compute SNR for swir: %s" % snr_swir) + L1_obj.swir.detector_meta.calc_snr_swir(detector=L1_obj.swir, snr_data_fn=snr_swir) return L1_obj diff --git a/enpt/model/metadata.py b/enpt/model/metadata.py index 011602d9645f1089c7cacb86be6741e6c63f5014..60ae405b6ea049fbc5af0858836adab7a7098d13 100644 --- a/enpt/model/metadata.py +++ b/enpt/model/metadata.py @@ -6,6 +6,7 @@ from xml.etree import ElementTree import logging import numpy as np from scipy.interpolate import interp2d +import spectral as sp L1B_product_props = dict( xml_detector_label=dict( @@ -105,6 +106,7 @@ class EnMAP_Metadata_L1B_Detector_SensorGeo(object): self.lons = self.interpolate_corners(*self.lon_UL_UR_LL_LR, *lon_lat_smpl) self.unit = 'none' # '" ".join(xml.findall("%s/radiance_unit" % lbl)[0].text.split()) self.unitcode = 'DN' + self.snr = None def calc_smile(self): """Compute smile for each EnMAP column. @@ -120,14 +122,39 @@ class EnMAP_Metadata_L1B_Detector_SensorGeo(object): self.smile_coef # shape = (nwvl, nsmile_coef) ) # shape = (ncols, nwvl) - def calc_snr(self, data: np.ndarray): + def calc_snr_vnir(self, detector, snr_data_fn: str) -> None: + """Compute EnMAP-VNIR SNR from radiance data. + + :param snr_data_fn: filename to SNR file + :param detector: vnir instance of EnMAP_Detector_SensorGeo + """ + assert self.unit == 'mW m^-2 sr^-1 nm^-1' + self.logger.info("Compute snr for: %s using: %s" % (self.detector_name, snr_data_fn)) + ds = sp.open_image(snr_data_fn) + p_hg = ds[0:3, :, :] + p_lg = ds[0:3, :, :] + l_th = np.squeeze(ds[6, :, :]) + + self.snr = np.zeros(detector.data.shape) + for irow in range(self.nrows): + hg = detector.data[irow, :, :] < l_th + l_hg = detector.data[irow, hg] + self.snr[irow, :, :][hg] = p_hg[0, hg] + p_hg[1, hg] * l_hg + p_hg[2, hg] * l_hg**2 + + lg = detector.data[irow, :, :] >= l_th + l_lg = detector.data[irow, lg] + self.snr[irow, :, :][lg] = p_lg[0, lg] + p_lg[1, lg] * l_lg + p_lg[2, lg] * l_lg ** 2 + + def calc_snr_swir(self, detector, snr_data_fn: str) -> None: """Compute EnMAP SNR from radiance data. - :param data: Numpy array with radiance for scene + :param snr_data_fn: filename to SNR file + :param detector: swir instance of EnMAP_Detector_SensorGeo """ - self.logger.info("Compute snr for: %s" % self.detector_name) - self.logger.warning("SNR model missing -> const. value of 500 is returned") - return 500 * np.ones(data.shape, dtype=np.float) + assert self.unit == 'mW m^-2 sr^-1 nm^-1' + self.logger.info("Compute snr for: %s using: %s" % (self.detector_name, snr_data_fn)) + pp = sp.open_image(snr_data_fn)[:, :, :] + self.snr = pp[0, :, :] + pp[1, :, :] * detector.data[:, :, :] + pp[2, :, :] * detector.data[:, :, :]**2 @staticmethod def interpolate_corners(ul: float, ur: float, ll: float, lr: float, nx: int, ny: int): diff --git a/setup.py b/setup.py index 414d0d0dcec914d0c6b8c8d7d60136475c57504e..409dec247657066e2f5b79b4a7e20d227fd41dfa 100644 --- a/setup.py +++ b/setup.py @@ -9,8 +9,8 @@ with open('README.rst') as readme_file: with open('HISTORY.rst') as history_file: history = history_file.read() -requirements = [ - 'numpy', 'scipy', 'geoarray' # put package requirements here +requirements = [ # put package requirements here + 'numpy', 'scipy', 'geoarray', 'spectral' ] test_requirements = ['coverage', 'nose', 'nose-htmloutput', 'rednose'] diff --git a/tests/data/EnMAP_Sensor/EnMAP_Level_1B_SNR.zip b/tests/data/EnMAP_Sensor/EnMAP_Level_1B_SNR.zip new file mode 100644 index 0000000000000000000000000000000000000000..a482a396c9773ed799abf30845eebf070b35f02c --- /dev/null +++ b/tests/data/EnMAP_Sensor/EnMAP_Level_1B_SNR.zip @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4f32268975fd276091c41d8b256ad3b65653000cf99f0d94f34e1e91c912ea92 +size 13778 diff --git a/tests/test_l1b_reader.py b/tests/test_l1b_reader.py index b9585d816c5bda4e21600c5b7a31b72a6986df73..4a45e63e9840b9bbbeb4c1919e9cdea68c92d75d 100644 --- a/tests/test_l1b_reader.py +++ b/tests/test_l1b_reader.py @@ -21,6 +21,8 @@ class Test_L1B_Reader(unittest.TestCase): def setUp(self): self.pathList_testimages = glob(os.path.join(os.path.dirname(__file__), "data", "EnMAP_Level_1B", "*.zip")) + self.l1b_snr_file = glob(os.path.join(os.path.dirname(__file__), + "data", "EnMAP_Sensor", "EnMAP_Level_1B_SNR.zip"))[0] self.user_config = dict() def tearDown(self): @@ -34,14 +36,23 @@ class Test_L1B_Reader(unittest.TestCase): for l1b_file in self.pathList_testimages: with tempfile.TemporaryDirectory() as tmpdir: print("Tmp dir: %s" % tmpdir) - with zipfile.ZipFile(l1b_file, "r") as zf: + with zipfile.ZipFile(self.l1b_snr_file, "r") as zf: zf.extractall(tmpdir) - root_dir = os.path.join(tmpdir, os.listdir(tmpdir)[0]) - L1_obj = L1B_Reader(**self.user_config)\ - .read_inputdata(root_dir, observation_time=datetime(2015, 12, 7, 10)) + with zipfile.ZipFile(l1b_file, "r") as zf: + zf.extractall(tmpdir) - self.assertIsInstance(L1_obj, EnMAPL1Product_SensorGeo) + root_dir = os.path.join(tmpdir, os.path.basename(l1b_file).split(".zip")[0]) + L1_obj_no_snr = L1B_Reader(**self.user_config)\ + .read_inputdata(root_dir, observation_time=datetime(2015, 12, 7, 10)) + + L1_obj_with_snr = L1B_Reader(**self.user_config) \ + .read_inputdata(root_dir, observation_time=datetime(2015, 12, 7, 10), + snr_vnir=os.path.join(tmpdir, "SNR_D1.hdr"), + snr_swir=os.path.join(tmpdir, "SNR_D2.hdr")) + + self.assertIsInstance(L1_obj_no_snr, EnMAPL1Product_SensorGeo) + self.assertIsInstance(L1_obj_with_snr, EnMAPL1Product_SensorGeo) if __name__ == "__main__":