metadata.py 14.9 KB
Newer Older
1
# -*- coding: utf-8 -*-
2
"""EnPT metadata modules. All object and functions regarding EnMAP metadata are implemented here."""
3
4
5
6

from datetime import datetime
from xml.etree import ElementTree
import logging
7
import os
8
from typing import Union
9
10
import numpy as np
from scipy.interpolate import interp2d
André Hollstein's avatar
André Hollstein committed
11
import spectral as sp
12
from geoarray import GeoArray
André Hollstein's avatar
André Hollstein committed
13

14
from ..options.config import EnPTConfig
15
from .srf import SRF
16
17


18
19
20
L1B_product_props = dict(
    xml_detector_label=dict(
        VNIR='detector1',
Daniel Scheffler's avatar
Daniel Scheffler committed
21
        SWIR='detector2'),
22
23
    fn_detector_suffix=dict(
        VNIR='D1',
Daniel Scheffler's avatar
Daniel Scheffler committed
24
        SWIR='D2')
25
26
27
)


28
29
30
#########################################################
# EnPT metadata objects for EnMAP data in sensor geometry
#########################################################
31
32


33
34
class EnMAP_Metadata_L1B_Detector_SensorGeo(object):
    """Class for all EnMAP metadata associated with a single EnMAP detector in sensor geometry.
35
36

    NOTE:
37
        - All metadata that have VNIR and SWIR detector in sensor geometry in common should be included here.
38
39

    """
Daniel Scheffler's avatar
Daniel Scheffler committed
40

41
    def __init__(self, detector_name: str, config: EnPTConfig, logger: logging.Logger=None):
Daniel Scheffler's avatar
Daniel Scheffler committed
42
        """Get a metadata object containing the metadata of a single EnMAP detector in sensor geometry.
43
44
45
46

        :param detector_name: Name of the detector (VNIR or SWIR)
        :param logger:
        """
47
        self.cfg = config
48
        self.detector_name = detector_name  # type: str
49
        self.detector_label = L1B_product_props['xml_detector_label'][detector_name]
50
51
52
53
        self.logger = logger or logging.getLogger()

        self.fwhm = None  # type: np.ndarray  # Full width half maximmum for each EnMAP band
        self.wvl_center = None  # type: np.ndarray  # Center wavelengths for each EnMAP band
54
55
        self.srf = None  # type: SRF  # SRF object holding the spectral response functions for each EnMAP band
        self.solar_irrad = None  # type: dict  # solar irradiance in [W/m2/nm] for eac band
56
57
58
59
60
61
62
63
64
65
66
67
        self.nwvl = None  # type: int  # Number of wave bands
        self.nrows = None  # type: int  # number of rows
        self.ncols = None  # type: int  # number of columns
        self.smile_coef = None  # type: np.ndarray  # smile coefficients needed for smile computation
        self.nsmile_coef = None  # type: int  # number of smile coefficients
        self.smile = None  # type: np.ndarray  # smile for each EnMAP image column
        self.l_min = None  # type: np.ndarray
        self.l_max = None  # type: np.ndarray
        self.geom_view_zenith = None  # type: float  # viewing zenith angle
        self.geom_view_azimuth = None  # type: float  # viewing azimuth angle
        self.geom_sun_zenith = None  # type: float  # sun zenith angle
        self.geom_sun_azimuth = None  # type: float  # sun azimuth angle
68
        self.mu_sun = None  # type: float  # needed by SICOR for TOARad > TOARef conversion
Daniel Scheffler's avatar
Daniel Scheffler committed
69
70
        self.lat_UL_UR_LL_LR = None  # type:  list  # latitude coordinates for UL, UR, LL, LR
        self.lon_UL_UR_LL_LR = None  # type:  list  # longitude coordinates for UL, UR, LL, LR
71
72
73
        self.lats = None  # type: np.ndarray  # 2D array of latitude coordinates according to given lon/lat sampling
        self.lons = None  # type: np.ndarray  # 2D array of longitude coordinates according to given lon/lat sampling
        self.unit = ''  # type: str  # radiometric unit of pixel values
74
        self.unitcode = ''  # type: str  # code of radiometric unit
75
76
        self.snr = None  # type: np.ndarray  # Signal to noise ratio as computed from radiance data

77
    def read_metadata(self, path_xml, lon_lat_smpl, nsmile_coef):
78
        """Read the metadata of a specific EnMAP detector in sensor geometry.
79
80
81
82
83

        :param path_xml:  file path of the metadata XML file
        :param lon_lat_smpl:  number if sampling points in lon, lat fields
        :param nsmile_coef:  number of polynomial coefficients for smile
        """
84
        xml = ElementTree.parse(path_xml).getroot()
85
        lbl = self.detector_label
86
        self.logger.info("Reading metadata for %s detector..." % self.detector_name)
87
88
89
90

        self.fwhm = np.array(xml.findall("%s/fwhm" % lbl)[0].text.replace("\n", "").split(), dtype=np.float)
        self.wvl_center = np.array(
            xml.findall("%s/centre_wavelength" % lbl)[0].text.replace("\n", "").split(), dtype=np.float)
91
92
        self.srf = SRF.from_cwl_fwhm(self.wvl_center, self.fwhm)
        self.solar_irrad = self.calc_solar_irradiance_CWL_FWHM_per_band()
93
94
95
96
97
98
99
100
101
102
103
104
        self.nwvl = len(self.wvl_center)
        self.nrows = np.int(xml.findall("%s/rows" % lbl)[0].text)
        self.ncols = np.int(xml.findall("%s/columns" % lbl)[0].text)
        self.smile_coef = np.array(xml.findall("%s/smile" % lbl)[0].text.replace("\n", "").split(), dtype=np.float) \
                            .reshape((-1, nsmile_coef + 1))[:, 1:]
        self.nsmile_coef = nsmile_coef
        self.smile = self.calc_smile()
        self.l_min = np.array(xml.findall("%s/L_min" % lbl)[0].text.split(), dtype=np.float)
        self.l_max = np.array(xml.findall("%s/L_max" % lbl)[0].text.split(), dtype=np.float)
        self.geom_view_zenith = np.float(
            xml.findall("%s/observation_geometry/zenith_angle" % lbl)[0].text.split()[0])
        self.geom_view_azimuth = np.float(
Daniel Scheffler's avatar
Daniel Scheffler committed
105
            xml.findall("%s/observation_geometry/azimuth_angle" % lbl)[0].text.split()[0])
106
107
        self.geom_sun_zenith = np.float(
            xml.findall("%s/illumination_geometry/zenith_angle" % lbl)[0].text.split()[0])
Daniel Scheffler's avatar
Daniel Scheffler committed
108
109
        self.geom_sun_azimuth = np.float(
            xml.findall("%s/illumination_geometry/azimuth_angle" % lbl)[0].text.split()[0])
110
        self.mu_sun = np.cos(np.deg2rad(self.geom_sun_zenith))
111
        self.lat_UL_UR_LL_LR = \
Daniel Scheffler's avatar
Daniel Scheffler committed
112
113
            [float(xml.findall("%s/geometry/bounding_box/%s_northing" % (lbl, corner))[0].text.split()[0])
             for corner in ("UL", "UR", "LL", "LR")]
114
        self.lon_UL_UR_LL_LR = \
Daniel Scheffler's avatar
Daniel Scheffler committed
115
116
            [float(xml.findall("%s/geometry/bounding_box/%s_easting" % (lbl, corner))[0].text.split()[0])
             for corner in ("UL", "UR", "LL", "LR")]
117
118
        self.lats = self.interpolate_corners(*self.lat_UL_UR_LL_LR, *lon_lat_smpl)
        self.lons = self.interpolate_corners(*self.lon_UL_UR_LL_LR, *lon_lat_smpl)
119
120
121

        try:
            self.unitcode = xml.findall("%s/unitcode" % lbl)[0].text
122
123
            # '" ".join(xml.findall("%s/radiance_unit" % lbl)[0].text.split())
            self.unit = xml.findall("%s/unit" % lbl)[0].text
124
125
        except IndexError:
            self.unitcode = 'DN'
126
127
128
            self.unit = 'none'
        except Exception:
            raise
129

André Hollstein's avatar
André Hollstein committed
130
        self.snr = None
131
132
133

    def calc_smile(self):
        """Compute smile for each EnMAP column.
Daniel Scheffler's avatar
Daniel Scheffler committed
134

135
136
137
138
139
140
141
142
143
144
145
        The sum in (1) is expressed as inner product of two arrays with inner dimension as the polynomial smile
        coefficients shape = (ncols, nsmile_coef) of polynomial x

        :return:
        """
        # smile(icol, iwvl) = sum_(p=0)^(nsmile_coef-1) smile_coef[iwvl, p] * icol**p (1)
        return np.inner(
            np.array([[icol ** p for p in np.arange(self.nsmile_coef)] for icol in np.arange(self.ncols)]),
            self.smile_coef  # shape = (nwvl, nsmile_coef)
        )  # shape = (ncols, nwvl)

146
147
    def calc_snr_from_radiance(self, rad_data: Union[GeoArray, np.ndarray], dir_snr_models: str):
        """Compute EnMAP SNR from radiance data for the given detector.
André Hollstein's avatar
André Hollstein committed
148

149
150
151
152
153
154
155
156
157
158
159
160
161
        SNR equation:    SNR = p0 + p1 * LTOA + p2 * LTOA ^ 2   [W/(m^2 sr nm)].
        NOTE:   The SNR model files (SNR_D1.bsq/SNR_D2.bsq) contain polynomial coefficients needed to compute SNR.
                SNR_D1.bsq: SNR model for EnMAP SWIR detector (contains high gain and low gain model coefficients)
                            - 1000 columns (for 1000 EnMAP columns)
                            - 88 bands for 88 EnMAP VNIR bands
                            - 7 lines:  - 3 lines: high gain coefficients
                                        - 3 lines: low gain coefficients
                                        - 1 line: threshold needed to decide about high gain or low gain
                SNR_D2.bsq: SNR model for EnMAP SWIR detector
                            - 1000 columns (for 1000 EnMAP columns)
                            - x bands for x EnMAP SWIR bands
                            - 3 lines for 3 coefficients

162
163
        :param rad_data:        image radiance data of EnMAP_Detector_SensorGeo
        :param dir_snr_models:  root directory where SNR model data is stored (must contain SNR_D1.bsq/SNR_D2.bsq)
André Hollstein's avatar
André Hollstein committed
164
        """
165
        path_snr_model = os.path.join(dir_snr_models, "SNR_D1.hdr" if self.detector_name == 'VNIR' else "SNR_D2.hdr")
166
        rad_data = np.array(rad_data)
167
168

        assert self.unitcode == 'TOARad'
169
        self.logger.info("Computing SNR for %s using %s" % (self.detector_name, path_snr_model))
170
171

        if self.detector_name == 'VNIR':
172
173
174
175
            gA = sp.open_image(path_snr_model)
            coeffs_highgain = gA[0:3, :, :]
            coeffs_lowgain = gA[3:6, :, :]
            gain_threshold = np.squeeze(gA[6, :, :])
176
177
178

            self.snr = np.zeros(rad_data.shape)
            for irow in range(self.nrows):
179
180
181
182
183
184
185
186
187
188
189
190
191
                highgain_mask = rad_data[irow, :, :] < gain_threshold  # a single row
                rad_highgain = rad_data[irow, highgain_mask]
                self.snr[irow, :, :][highgain_mask] = \
                    coeffs_highgain[0, highgain_mask] + \
                    coeffs_highgain[1, highgain_mask] * rad_highgain + \
                    coeffs_highgain[2, highgain_mask] * rad_highgain ** 2

                lowgain_mask = rad_data[irow, :, :] >= gain_threshold
                rad_lowgain = rad_data[irow, lowgain_mask]
                self.snr[irow, :, :][lowgain_mask] = \
                    coeffs_lowgain[0, lowgain_mask] + \
                    coeffs_lowgain[1, lowgain_mask] * rad_lowgain + \
                    coeffs_lowgain[2, lowgain_mask] * rad_lowgain ** 2
192
193

        else:
194
195
            coeffs = sp.open_image(path_snr_model)[:, :, :]
            self.snr = coeffs[0, :, :] + coeffs[1, :, :] * rad_data[:, :, :] + coeffs[2, :, :] * rad_data[:, :, :] ** 2
196
197
198
199
200
201
202
203
204
205
206
207
208
209

    @staticmethod
    def interpolate_corners(ul: float, ur: float, ll: float, lr: float, nx: int, ny: int):
        """Compute interpolated field from corner values of a scalar field given at: ul, ur, ll, lr.

        :param nx, ny: final shape
        """
        ff = interp2d(x=[0, 1], y=[0, 1], z=[[ul, ur], [ll, lr]], kind='linear')
        rr = np.zeros((nx, ny), dtype=np.float)
        for i, x in enumerate(np.linspace(0, 1, nx)):
            for j, y in enumerate(np.linspace(0, 1, ny)):
                rr[i, j] = ff(x, y)
        return rr

210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
    def calc_solar_irradiance_CWL_FWHM_per_band(self) -> dict:
        from ..io.reader import Solar_Irradiance_reader

        self.logger.debug('Calculating solar irradiance...')

        sol_irr = Solar_Irradiance_reader(path_solar_irr_model=self.cfg.path_solar_irr, wvl_min_nm=350, wvl_max_nm=2500)

        irr_bands = {}
        for band in self.srf.bands:
            if self.srf[band] is None:
                irr_bands[band] = None
            else:
                WVL_band = self.srf.srfs_wvl if self.srf.wvl_unit == 'nanometers' else self.srf.srfs_wvl * 1000
                RSP_band = self.srf.srfs_norm01[band]
                sol_irr_at_WVL = np.interp(WVL_band, sol_irr[:, 0], sol_irr[:, 1], left=0, right=0)

                irr_bands[band] = round(np.sum(sol_irr_at_WVL * RSP_band) / np.sum(RSP_band), 2)

        return irr_bands

230

231
class EnMAP_Metadata_L1B_SensorGeo(object):
Daniel Scheffler's avatar
Daniel Scheffler committed
232
    """EnMAP Metadata class holding the metadata of the complete EnMAP product in sensor geometry incl. VNIR and SWIR.
233
234
235
236

    Attributes:
        - logger(logging.Logger):  None or logging instance
        - observation_datetime(datetime.datetime):  datetime of observation time (currently missing in metadata)
237
238
        - vnir(EnMAP_Metadata_VNIR_SensorGeo)
        - swir(EnMAP_Metadata_SWIR_SensorGeo)
Daniel Scheffler's avatar
Daniel Scheffler committed
239

240
    """
Daniel Scheffler's avatar
Daniel Scheffler committed
241

242
    def __init__(self, path_metaxml, config: EnPTConfig, logger=None):
243
244
245
246
247
        """Get a metadata object instance for the given EnMAP L1B product in sensor geometry.

        :param path_metaxml:  file path of the EnMAP L1B metadata XML file
        :param logger:  instance of logging.logger or subclassed
        """
248
        self.cfg = config
249
        self.logger = logger or logging.getLogger()
Daniel Scheffler's avatar
Daniel Scheffler committed
250
        self._path_xml = path_metaxml
251

252
        # defaults
253
        self.observation_datetime = None  # type: datetime  # Date and Time of image observation
254
        self.earthSunDist = None  # type: float  # earth-sun distance # TODO doc correct?
255
256
        self.vnir = None  # type: EnMAP_Metadata_L1B_Detector_SensorGeo # metadata of VNIR only
        self.swir = None  # type: EnMAP_Metadata_L1B_Detector_SensorGeo # metadata of SWIR only
257
        self.detector_attrNames = ['vnir', 'swir']
258

Daniel Scheffler's avatar
Daniel Scheffler committed
259
    def read_common_meta(self, observation_time: datetime=None):
260
261
262
263
        """Read the metadata belonging to both, the VNIR and SWIR detector of the EnMAP L1B product in sensor geometry.

        :param observation_time:  date and time of image observation (datetime.datetime)
        """
264
265
        # FIXME observation time is currently missing in the XML
        self.observation_datetime = observation_time
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
        self.earthSunDist = self.get_earth_sun_distance(self.observation_datetime)

    def get_earth_sun_distance(self, acqDate: datetime):
        """Get earth sun distance (requires file of pre calculated earth sun distance per day)

        :param acqDate:
        """

        if not os.path.exists(self.cfg.path_earthSunDist):
            self.logger.warning("\n\t WARNING: Earth Sun Distance is assumed to be "
                                "1.0 because no database can be found at %s.""" % self.cfg.path_earthSunDist)
            return 1.0
        if not acqDate:
            self.logger.warning("\n\t WARNING: Earth Sun Distance is assumed to be 1.0 because "
                                "acquisition date could not be read from metadata.")
            return 1.0

        with open(self.cfg.path_earthSunDist, "r") as EA_dist_f:
            EA_dist_dict = {}
            for line in EA_dist_f:
                date, EA = [item.strip() for item in line.split(",")]
                EA_dist_dict[date] = EA

        return float(EA_dist_dict[acqDate.strftime('%Y-%m-%d')])
290

291
    def read_metadata(self, observation_time: datetime, lon_lat_smpl, nsmile_coef):
292
293
294
295
296
297
        """Read the metadata of the whole EnMAP L1B product in sensor geometry.

        :param observation_time:  date and time of image observation (datetime.datetime)
        :param lon_lat_smpl:  number if sampling points in lon, lat fields
        :param nsmile_coef:  number of polynomial coefficients for smile
        """
Daniel Scheffler's avatar
Daniel Scheffler committed
298
        self.read_common_meta(observation_time)
299
        self.vnir = EnMAP_Metadata_L1B_Detector_SensorGeo('VNIR', config=self.cfg, logger=self.logger)
300
        self.vnir.read_metadata(self._path_xml, lon_lat_smpl=lon_lat_smpl, nsmile_coef=nsmile_coef)
301
        self.swir = EnMAP_Metadata_L1B_Detector_SensorGeo('SWIR', config=self.cfg, logger=self.logger)
302
        self.swir.read_metadata(self._path_xml, lon_lat_smpl=lon_lat_smpl, nsmile_coef=nsmile_coef)