Commit ff5dad9c authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Further developed spectral homogenization.

parent a1b47913
......@@ -32,6 +32,7 @@ from ..options.config import GMS_config as CFG
from ..io.input_reader import SRF # noqa F401 # flake8 issue
from ..misc.logging import GMS_logger
from ..misc.definition_dicts import datasetid_dict
from ..model.metadata import get_LayerBandsAssignment
from .L2A_P import L2A_object
__author__ = 'Daniel Scheffler'
......@@ -113,7 +114,16 @@ class SpectralHomogenizer(object):
@staticmethod
def interpolate_cube(arrcube, source_CWLs, target_CWLs, kind='linear'):
# type: (Union[np.ndarray, GeoArray], list, list) -> np.ndarray
assert kind in ['linear', ], "%s is not a supported kind of spectral interpolation." % kind
"""Spectrally nterpolate the spectral bands of a remote sensing image to new band positions.
:param arrcube: array to be spectrally interpolated
:param source_CWLs: list of source central wavelength positions
:param target_CWLs: list of target central wavelength positions
:param kind: interpolation kind to be passed to scipy.interpolate.interp1d (default: 'linear')
:return:
"""
assert kind in ['linear', 'nearest', 'zero', 'slinear', 'quadratic', 'cubic'], \
"%s is not a supported kind of spectral interpolation." % kind
assert arrcube is not None,\
'L2B_obj.interpolate_cube_linear expects a numpy array as input. Got %s.' % type(arrcube)
......@@ -798,11 +808,13 @@ class ReferenceCube_Generator(object):
# resample the set of random spectra to match the spectral characteristics of all target sensors
for tgt_sat, tgt_sen in self.tgt_sat_sen_list:
GMS_identifier = dict(Satellite=tgt_sat, Sensor=tgt_sen, Subsystem=None, image_type='RSD',
proc_level='L1A', logger=self.logger) # use L1A to have all bands available
tgt_LBA = get_LayerBandsAssignment(GMS_identifier)
# perform spectal resampling
self.logger.info('Performing spectral resampling to match %s %s specifications...' % (tgt_sat, tgt_sen))
tgt_srf = SRF(dict(Satellite=tgt_sat, Sensor=tgt_sen, Subsystem=None, image_type='RSD',
proc_level='L1A', logger=self.logger))
tgt_srf = SRF()
unif_random_spectra_rsp = \
self.resample_spectra(unif_random_spectra,
src_cwl=np.array(src_im.meta.loc['wavelength'], dtype=np.float).flatten(),
......@@ -1054,8 +1066,16 @@ class Classifier_Generator(object):
ML = specHomoApproaches[method]()
ML.fit(train_X, train_Y)
# append the scores
# append some metadata
ML.scores = dict(train=ML.score(train_X, train_Y), test=ML.score(test_X, test_Y))
ML.src_satellite = src_cube.satellite
ML.src_sensor = src_cube.sensor
ML.tgt_satellite = tgt_cube.satellite
ML.tgt_sensor = tgt_cube.sensor
ML.src_LBA = []
ML.tgt_LBA = []
ML.src_n_bands = len(ML.src_LBA)
ML.tgt_n_bands = len(ML.tgt_LBA)
# dump to disk
with open(os.path.join(outDir, fName_cls), 'wb') as outF:
......@@ -1097,11 +1117,16 @@ class RSImage_Predictor(object):
return ML_instance
def predict(self, image, src_satellite, src_sensor, tgt_satellite, tgt_sensor, nodataVal=None):
def predict(self, image, src_satellite, src_sensor, tgt_satellite, tgt_sensor, nodataVal=None, CPUs=1):
spectra = im2spectra(image)
ML = self.get_classifier(src_satellite, src_sensor, tgt_satellite, tgt_sensor)
spectra_predicted = ML.predict(spectra).astype(image.dtype)
classifier = self.get_classifier(src_satellite, src_sensor, tgt_satellite, tgt_sensor)
# adjust classifier
if CPUs is None or CPUs > 1:
classifier.n_jobs = cpu_count() if CPUs is None else CPUs
spectra_predicted = classifier.predict(spectra).astype(image.dtype)
# 2D -> 3D
image_predicted = spectra2im(spectra_predicted, rows=image.shape[0], cols=image.shape[1])
......
......@@ -1852,7 +1852,7 @@ def get_LayerBandsAssignment(GMS_identifier, nBands=None, ignore_usecase=False,
path_ac_options = get_path_ac_options(GMS_identifier)
if path_ac_options and os.path.exists(path_ac_options):
# FIXME this does not work for L7
# FIXME don't validate because options contain pathes that do not exist on another server
# NOTE: don't validate because options contain pathes that do not exist on another server
ac_bandNs = get_ac_options(path_ac_options, validation=False)['AC']['bands']
ac_out_LBA = [bN.split('B0')[1] if bN.startswith('B0') else bN.split('B')[1] for bN in ac_bandNs]
LayerBandsAssignment = [i for i in LayerBandsAssignment if i in ac_out_LBA]
......
......@@ -39,6 +39,9 @@ class GMS_configuration(object):
else:
raise EnvironmentError("Config has not been set already on this machine. Run 'set_config()' first!'")
def __repr__(self):
return getattr(builtins, 'GMS_JobConfig').__repr__()
GMS_config = GMS_configuration() # type: JobConfig
......
......@@ -19,6 +19,7 @@ from gms_preprocessing import set_config # noqa E402 module level import not at
from gms_preprocessing.algorithms.L2B_P import ReferenceCube_Generator_OLD # noqa E402 module level import not at top of file
from gms_preprocessing.algorithms.L2B_P import ReferenceCube_Generator # noqa E402 module level import not at top of file
from gms_preprocessing.algorithms.L2B_P import RefCube # noqa E402 module level import not at top of file
from gms_preprocessing.algorithms.L2B_P import SpectralHomogenizer # noqa E402 module level import not at top of file
testdata = os.path.join(__path__[0], '../tests/data/hy_spec_data/Bavaria_farmland_LMU_Hyspex_subset.bsq')
......@@ -126,3 +127,46 @@ class Test_ReferenceCube_Generator(unittest.TestCase):
self.assertTrue(np.any(ref_cube_sp), msg='Singleprocessing result is empty.')
self.assertTrue(np.any(ref_cube_mp), msg='Multiprocessing result is empty.')
class Test_SpectralHomogenizer(unittest.TestCase):
"""Tests class for gms_preprocessing.algorithms.L2B_P.Test_SpectralHomogenizer"""
@classmethod
def setUpClass(cls):
# Testjob Landsat-8
cfg = set_config(exec_mode='Python', job_ID=26186196, db_host='geoms', reset_status=True, is_test=True)
cls.SpH = SpectralHomogenizer(classifier_rootDir=cfg.path_spechomo_classif)
cls.testArr_L8 = GeoArray(np.random.randint(1, 10000, (50, 50, 8), dtype=np.int16)) # no band 9, no pan
cls.cwl_L8 = [442.98, 482.59, 561.33, 654.61, 864.57, 1609.09, 2201.25]
def test_interpolate_cube_linear(self):
outarr = self.SpH.interpolate_cube(self.testArr_L8, self.cwl_L8, [500., 700., 1300.], kind='linear')
self.assertIsInstance(outarr, np.ndarray)
self.assertEqual(outarr.shape, (50, 50, 3))
self.assertEqual(outarr.dtype, np.int16)
def test_interpolate_cube_quadratic(self):
outarr = self.SpH.interpolate_cube(self.testArr_L8, self.cwl_L8, [500., 700., 1300.], kind='quadratic')
self.assertIsInstance(outarr, np.ndarray)
self.assertEqual(outarr.shape, (50, 50, 3))
self.assertEqual(outarr.dtype, np.int16)
def test_predict_by_machine_learner__LR_L8_S2(self):
"""Test linear regression from Landsat-8 to Sentinel-2A."""
predarr = self.SpH.predict_by_machine_learner(self.testArr_L8, method='LR',
src_satellite='Landsat-8', src_sensor='OLI_TIRS',
tgt_satellite='Sentinel-2A', tgt_sensor='MSI')
self.assertIsInstance(predarr, np.ndarray)
self.assertEqual(predarr.shape, (50, 50, 13))
self.assertEqual(predarr.dtype, np.int16)
def test_predict_by_machine_learner__RR_L8_S2(self):
"""Test ridge regression from Landsat-8 to Sentinel-2A."""
predarr = self.SpH.predict_by_machine_learner(self.testArr_L8, method='RR',
src_satellite='Landsat-8', src_sensor='OLI_TIRS',
tgt_satellite='Sentinel-2A', tgt_sensor='MSI')
self.assertIsInstance(predarr, np.ndarray)
self.assertEqual(predarr.shape, (50, 50, 13))
self.assertEqual(predarr.dtype, np.int16)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment