Commit a1b47913 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

First running version of Spectral Homogenization via Linear Regression.

parent d1e17d47
Pipeline #1704 failed with stage
in 11 minutes and 59 seconds
......@@ -31,6 +31,7 @@ from py_tools_ds.processing.progress_mon import ProgressBar
from ..options.config import GMS_config as CFG
from ..io.input_reader import SRF # noqa F401 # flake8 issue
from ..misc.logging import GMS_logger
from ..misc.definition_dicts import datasetid_dict
from .L2A_P import L2A_object
__author__ = 'Daniel Scheffler'
......@@ -51,35 +52,86 @@ class L2B_object(L2A_object):
src_cwls = self.meta_odict['wavelength']
# FIXME exclude or include thermal bands; respect sorted CWLs in context of LayerBandsAssignment
tgt_cwls = CFG.target_CWL
if src_cwls != tgt_cwls:
assert kind in ['linear', ], "%s is not a supported kind of homogenization." % kind
self.log_for_fullArr_or_firstTile(
'Performing spectral homogenization (%s) with target wavelength positions at %s nm.'
% (kind, ', '.join(np.array(tgt_cwls[:-1]).astype(str))+' and %s' % tgt_cwls[-1]))
method = CFG.spechomo_method
# TODO better band names for homogenized product -> include in get_LayerBandsAssignment
self.LayerBandsAssignment = []
self.arr = self.interpolate_cube_linear(self.arr, src_cwls, tgt_cwls) if kind == 'linear' else self.arr
if self.dataset_ID == CFG.datasetid_spectral_ref:
self.logger.info("Spectral homogenization has been skipped because the dataset id equals the dataset id of "
"the spectral refernce sensor.")
return
self.meta_odict['wavelength'] = list(tgt_cwls)
self.meta_odict['bands'] = len(tgt_cwls)
if 'band names' in self.meta_odict: # FIXME bug workaround
del self.meta_odict['band names'] # TODO
else:
if src_cwls == tgt_cwls:
self.logger.info("Spectral homogenization has been skipped because the current spectral characteristics "
"are already equal to the target sensor's.")
return
# perform spectral homogenization!
SpH = SpectralHomogenizer(classifier_rootDir=CFG.path_spechomo_classif)
if method == 'LI' or CFG.datasetid_spectral_ref is None:
# linear interpolation
# or a custom sensor has been specified -> no classifier for that case available -> linear interpolation
self.log_for_fullArr_or_firstTile(
'Performing spectral homogenization (%s) with target wavelength positions at %s nm.'
% (kind, ', '.join(np.array(tgt_cwls[:-1]).astype(str)) + ' and %s' % tgt_cwls[-1]))
outArr = SpH.interpolate_cube(self.arr, src_cwls, tgt_cwls, kind='linear')
else:
# a known sensor has been specified as spectral reference => apply a machine learner
try:
outArr = SpH.predict_by_machine_learner(self.arr,
method=method,
src_satellite=self.satellite,
src_sensor=self.sensor,
tgt_satellite=datasetid_dict[CFG.datasetid_spectral_ref][0],
tgt_sensor=datasetid_dict[CFG.datasetid_spectral_ref][1],
nodataVal=self.arr.nodata)
except FileNotFoundError:
self.logger.warning('No machine learning classifier available that fulfills the specifications of the '
'spectral reference sensor. Falling back to linear interpolation for performing '
'spectral homogenization.')
self.logger.info(
'Performing spectral homogenization (%s) with target wavelength positions at %s nm.'
% (kind, ', '.join(np.array(tgt_cwls[:-1]).astype(str)) + ' and %s' % tgt_cwls[-1]))
outArr = SpH.interpolate_cube(self.arr, src_cwls, tgt_cwls, kind='linear')
self.arr = outArr
# update metadata
# TODO better band names for homogenized product -> include in get_LayerBandsAssignment
self.LayerBandsAssignment = []
self.meta_odict['wavelength'] = list(tgt_cwls)
self.meta_odict['bands'] = len(tgt_cwls)
if 'band names' in self.meta_odict: # FIXME bug workaround
del self.meta_odict['band names'] # TODO
class SpectralHomogenizer(object):
def __init__(self, classifier_rootDir=''):
self.classifier_rootDir = classifier_rootDir or CFG.path_spechomo_classif
@staticmethod
def interpolate_cube_linear(arrcube, source_CWLs, target_CWLs):
def interpolate_cube(arrcube, source_CWLs, target_CWLs, kind='linear'):
# type: (Union[np.ndarray, GeoArray], list, list) -> np.ndarray
assert kind in ['linear', ], "%s is not a supported kind of spectral interpolation." % kind
assert arrcube is not None,\
'L2B_obj.interpolate_cube_linear expects a numpy array as input. Got %s.' % type(arrcube)
orig_CWLs, target_CWLs = np.array(source_CWLs), np.array(target_CWLs)
outarr = interp1d(np.array(orig_CWLs), arrcube, axis=2, kind='linear', fill_value='extrapolate')(target_CWLs)
outarr = interp1d(np.array(orig_CWLs), arrcube, axis=2, kind=kind, fill_value='extrapolate')(target_CWLs)
outarr = outarr.astype(np.int16)
assert outarr.shape == tuple([*arrcube.shape[:2], len(target_CWLs)])
return outarr
def predict_by_machine_learner(self, arrcube, method, src_satellite, src_sensor, tgt_satellite, tgt_sensor,
nodataVal=None):
PR = RSImage_Predictor(method=method, classifier_rootDir=self.classifier_rootDir)
predicted_arr = PR.predict(arrcube, src_satellite, src_sensor, tgt_satellite, tgt_sensor, nodataVal=nodataVal)
return predicted_arr
class SpectralResampler(object):
"""Class for spectral resampling of a single spectral signature (1D-array) or an image (3D-array)."""
......@@ -353,14 +405,14 @@ class KMeansRSImage(object):
return random_samples
class _MachineLearner_RSImage(object):
class TrainingData(object):
def __init__(self, im_X, im_Y, test_size):
self.im_X = GeoArray(im_X)
self.im_Y = GeoArray(im_Y)
# Set spectra (3D to 2D conversion)
self.spectra_X = self._im2spectra(self.im_X)
self.spectra_Y = self._im2spectra(self.im_Y)
self.spectra_X = im2spectra(self.im_X)
self.spectra_Y = im2spectra(self.im_Y)
# Set train and test variables
# NOTE: If random_state is set to an Integer, train_test_split will always select the same 'pseudo-random' set
......@@ -368,37 +420,6 @@ class _MachineLearner_RSImage(object):
self.train_X, self.test_X, self.train_Y, self.test_Y = \
train_test_split(self.spectra_X, self.spectra_Y, test_size=test_size, shuffle=True, random_state=0)
@classmethod
def from_dump(cls, dillFile):
# type: (str) -> _MachineLearner_RSImage
with open(dillFile, 'rb') as inF:
ML_instance = dill.load(inF)
if not isinstance(ML_instance, cls):
raise ValueError('The given dillFile does not contain an instance of %s but %s.'
% (cls.__name__, type(ML_instance)))
return ML_instance
def __getstate__(self):
def dump(self, path_out, exclude_arrays=True):
if exclude_arrays:
instance2dump = copy()
with open(path_out, 'wb') as outF:
dill.dump(self, outF)
@staticmethod
def _im2spectra(geoArr):
"""Convert images to array of spectra samples (rows: samples; cols: spectral information)."""
return geoArr.reshape((geoArr.rows * geoArr.cols, geoArr.bands))
@staticmethod
def _spectra2im(spectra, rows, cols):
"""Convert array of spectra samples (rows: samples; cols: spectral information) to a 3D image."""
return spectra.reshape(rows, cols, spectra.shape[1])
def plot_scatter_matrix(self, figsize=(15, 15), mode='intersensor'):
train_X = self.train_X[np.random.choice(self.train_X.shape[0], 1000, replace=False), :]
train_Y = self.train_Y[np.random.choice(self.train_Y.shape[0], 1000, replace=False), :]
......@@ -439,76 +460,115 @@ class _MachineLearner_RSImage(object):
scatter_matrix(df, figsize=figsize, marker='.', hist_kwds={'bins': 50}, s=30, alpha=0.8)
plt.suptitle('Image Y band to band correlation')
def plot_scattermatrix(self):
import seaborn
class LinearRegression_RSImage(_MachineLearner_RSImage):
def __init__(self, im_X, im_Y, test_size=0.6):
super(LinearRegression_RSImage, self).__init__(im_X, im_Y, test_size=test_size)
self.linRegressor = LinearRegression().fit(self.train_X, self.train_Y) # type: LinearRegression
@property
def coefficients_(self):
# type: () -> np.ndarray
"""Return derived coefficients of the fitted function (also called 'weights' or 'slope' parameters)."""
return self.linRegressor.coef_
@property
def intercept_(self):
# type: () -> np.ndarray
"""Return derived intercept (or 'offset') of the fitted function."""
return self.linRegressor.intercept_
@property
def scores(self):
"""The R² values of the regression for the train and the test data."""
return dict(train=self.linRegressor.score(self.train_X, self.train_Y),
test=self.linRegressor.score(self.test_X, self.test_Y))
def predict(self, im_Y, nodataVal=None):
# type: (Union[GeoArray, np.ndarray]) -> np.ndarray
# 3D -> 2D
spectra_Y = self._im2spectra(im_Y)
# predict
spectra_predicted = self.linRegressor.predict(spectra_Y).astype(im_Y.dtype)
# 2D -> 3D
image_predicted = self._spectra2im(spectra_predicted, rows=im_Y.shape[0], cols=im_Y.shape[1])
fig, axes = plt.subplots(self.src_cube.data.bands, self.tgt_cube.data.bands,
figsize=(25, 9), sharex='all', sharey='all')
fig.suptitle('Correlation of %s and %s bands' % (self.src_cube.satellite, self.tgt_cube.satellite), size=25)
# re-apply nodata values to predicted result
if nodataVal is not None:
im_Y_gA = GeoArray(im_Y, nodata=nodataVal)
image_predicted[im_Y_gA.mask_nodata[:] == 0] = nodataVal
color = seaborn.hls_palette(13)
return image_predicted
for i, ax in zip(range(6), axes.flatten()):
for j, ax in zip(range(13), axes.flatten()):
axes[i, j].scatter(train_s2[:, j], train_s3[:, 5 - i], c=color[j], label=str(j))
axes[i, j].set_xlim(-0.1, 1.1)
axes[i, j].set_ylim(-0.1, 1.1)
if j == 8:
axes[5, j].set_xlabel('S2 B8A\n' + str(metadata_s2['Bands_S2'][j]) + ' nm', size=10)
elif j in range(9, 13):
axes[5, j].set_xlabel('S2 B' + str(j) + '\n' + str(metadata_s2['Bands_S2'][j]) + ' nm', size=10)
else:
axes[5, j].set_xlabel('S2 B' + str(j + 1) + '\n' + str(metadata_s2['Bands_S2'][j]) + ' nm', size=10)
axes[i, 0].set_ylabel('S3 SLSTR B' + str(6 - i) + '\n' + str(metadata_s3['Bands_S3'][5 - i]) + ' nm',
size=10)
axes[4, j].set_xticks(np.arange(0, 1.2, 0.2))
axes[i, j].plot([0, 1], [0, 1], c='red')
def show_band_scatterplot(self, band_src_im, band_tgt_im):
from scipy.stats import gaussian_kde
class RidgeRegression_RSImage(_MachineLearner_RSImage):
def __init__(self, im_X, im_Y, test_size=0.6):
# TODO
raise NotImplementedError()
super(RidgeRegression_RSImage, self).__init__(im_X, im_Y, test_size=test_size)
x = self.src_cube.data[band_src_im].flatten()[:10000]
y = self.tgt_cube.data[band_tgt_im].flatten()[:10000]
self.ridgeRegressor = RidgeClassifier().fit(self.train_X, self.train_Y)
# Calculate the point density
xy = np.vstack([x, y])
z = gaussian_kde(xy)(xy)
@property
def coefficients_(self):
# type: () -> np.ndarray
"""Return derived coefficients of the fitted function (also called 'weights' or 'slope' parameters)."""
return self.ridgeRegressor.coef_
fig = plt.figure(figsize=(15, 15))
plt.scatter(x, y, c=z, s=30, edgecolor='')
plt.show()
@property
def intercept_(self):
# type: () -> np.ndarray
"""Return derived intercept (or 'offset') of the fitted function."""
return self.ridgeRegressor.intercept_
@property
def scores(self):
"""The R² values of the regression for the train and the test data."""
return dict(train=self.ridgeRegressor.score(self.train_X, self.train_Y),
test=self.ridgeRegressor.score(self.test_X, self.test_Y))
def im2spectra(geoArr):
"""Convert images to array of spectra samples (rows: samples; cols: spectral information)."""
return geoArr.reshape((geoArr.rows * geoArr.cols, geoArr.bands))
def spectra2im(spectra, rows, cols):
"""Convert array of spectra samples (rows: samples; cols: spectral information) to a 3D image."""
return spectra.reshape(rows, cols, spectra.shape[1])
# class _MachineLearner_RSImage(object):
# @staticmethod
# def scores(machine_learner, training_data):
# # type: (Union[LinearRegression_RSImage, RidgeRegression_RSImage], TrainingData) -> dict
# """The R² values of the regression for the train and the test data."""
# return dict(train=machine_learner.score(training_data.train_X, training_data.train_Y),
# test=machine_learner.score(training_data.test_X, training_data.test_Y))
#
# @staticmethod
# def predict(machine_learner, im_Y, nodataVal=None):
# # type: (Union[LinearRegression_RSImage, RidgeRegression_RSImage], Union[GeoArray, np.ndarray]) -> np.ndarray
#
# # 3D -> 2D
# spectra_Y = im2spectra(im_Y)
#
# # predict
# spectra_predicted = machine_learner.predict(spectra_Y).astype(im_Y.dtype)
#
# # 2D -> 3D
# image_predicted = spectra2im(spectra_predicted, rows=im_Y.shape[0], cols=im_Y.shape[1])
#
# # re-apply nodata values to predicted result
# if nodataVal is not None:
# im_Y_gA = GeoArray(im_Y, nodata=nodataVal)
# image_predicted[im_Y_gA.mask_nodata[:] == 0] = nodataVal
#
# return image_predicted
#
# @classmethod
# def from_dump(cls, dillFile):
# # type: (str) -> _MachineLearner_RSImage
# with open(dillFile, 'rb') as inF:
# ML_instance = dill.load(inF)
#
# if not isinstance(ML_instance, cls):
# raise ValueError('The given dillFile does not contain an instance of %s but %s.'
# % (cls.__name__, type(ML_instance)))
# return ML_instance
#
# def dump(self, path_out):
# with open(path_out, 'wb') as outF:
# dill.dump(self, outF)
#
#
# class LinearRegression_RSImage(LinearRegression, _MachineLearner_RSImage):
# """Class for applying Linear Regression to an remote sensing image."""
#
# def fit(self, train_X, train_Y, **kwargs):
# super(LinearRegression_RSImage, self).fit(train_X, train_Y, **kwargs)
#
# def predict(self, X):
# super(_MachineLearner_RSImage, self).predict
#
#
# class RidgeRegression_RSImage(RidgeClassifier, _MachineLearner_RSImage):
# """Class for applying Linear Regression to an remote sensing image."""
#
# def fit(self, train_X, train_Y, **kwargs):
# super(RidgeRegression_RSImage, self).fit(train_X, train_Y, **kwargs)
class ReferenceCube_Generator_OLD(object):
......@@ -970,73 +1030,85 @@ class Classifier_Trainer(object):
self.src_cube = src_sensor_refcube if isinstance(src_sensor_refcube, RefCube) else RefCube(src_sensor_refcube)
self.tgt_cube = tgt_sensor_refcube if isinstance(src_sensor_refcube, RefCube) else RefCube(src_sensor_refcube)
def plot_scattermatrix(self):
import seaborn
fig, axes = plt.subplots(self.src_cube.data.bands, self.tgt_cube.data.bands,
figsize=(25, 9), sharex='all', sharey='all')
fig.suptitle('Correlation of %s and %s bands' % (self.src_cube.satellite, self.tgt_cube.satellite), size=25)
class Classifier_Generator(object):
def __init__(self, list_refcubes):
# type: (List[Union[str, RefCube]]) -> None
self.refcubes = [RefCube(inRC) if isinstance(inRC, str) else inRC for inRC in list_refcubes]
color = seaborn.hls_palette(13)
def create_classifiers(self, outDir, method='LR', *args, **kwargs):
for src_cube, tgt_cube in itertools.permutations(self.refcubes, r=2): # type: RefCube
fName_cls = get_classifier_filename(method, src_cube.satellite, src_cube.sensor,
tgt_cube.satellite, tgt_cube.sensor)
for i, ax in zip(range(6), axes.flatten()):
for j, ax in zip(range(13), axes.flatten()):
axes[i, j].scatter(train_s2[:, j], train_s3[:, 5 - i], c=color[j], label=str(j))
axes[i, j].set_xlim(-0.1, 1.1)
axes[i, j].set_ylim(-0.1, 1.1)
if j == 8:
axes[5, j].set_xlabel('S2 B8A\n' + str(metadata_s2['Bands_S2'][j]) + ' nm', size=10)
elif j in range(9, 13):
axes[5, j].set_xlabel('S2 B' + str(j) + '\n' + str(metadata_s2['Bands_S2'][j]) + ' nm', size=10)
else:
axes[5, j].set_xlabel('S2 B' + str(j + 1) + '\n' + str(metadata_s2['Bands_S2'][j]) + ' nm', size=10)
axes[i, 0].set_ylabel('S3 SLSTR B' + str(6 - i) + '\n' + str(metadata_s3['Bands_S3'][5 - i]) + ' nm',
size=10)
axes[4, j].set_xticks(np.arange(0, 1.2, 0.2))
axes[i, j].plot([0, 1], [0, 1], c='red')
print("Creating classifier %s..." % fName_cls)
def show_band_scatterplot(self, band_src_im, band_tgt_im):
from scipy.stats import gaussian_kde
# Set train and test variables
# NOTE: If random_state is set to an Integer, train_test_split will always select the same 'pseudo-random'
# set of the input data.
train_X, test_X, train_Y, test_Y = \
train_test_split(im2spectra(src_cube.data), im2spectra(tgt_cube.data),
test_size=0.4, shuffle=True, random_state=0)
x = self.src_cube.data[band_src_im].flatten()[:10000]
y = self.tgt_cube.data[band_tgt_im].flatten()[:10000]
# train the model
ML = specHomoApproaches[method]()
ML.fit(train_X, train_Y)
# Calculate the point density
xy = np.vstack([x, y])
z = gaussian_kde(xy)(xy)
# append the scores
ML.scores = dict(train=ML.score(train_X, train_Y), test=ML.score(test_X, test_Y))
fig = plt.figure(figsize=(15, 15))
plt.scatter(x, y, c=z, s=30, edgecolor='')
plt.show()
# dump to disk
with open(os.path.join(outDir, fName_cls), 'wb') as outF:
dill.dump(ML, outF)
class Classifier_Generator(object):
def __init__(self, list_refcubes):
# type: (List[Union[str, RefCube]]) -> None
self.refcubes = [RefCube(inRC) if isinstance(inRC, str) else inRC for inRC in list_refcubes]
specHomoApproaches = dict(
LR=LinearRegression,
RR=RidgeClassifier
)
def create_classifiers(self, outDir, clsApproach='LR', *args, **kwargs):
for src_cube, tgt_cube in itertools.combinations(self.refcubes, r=2): # type: RefCube
print(src_cube.satellite, src_cube.sensor, tgt_cube.satellite, tgt_cube.sensor)
fName_cls = '__'.join([clsApproach, src_cube.satellite, src_cube.sensor]) + \
'__to__' + '__'.join([tgt_cube.satellite, tgt_cube.sensor]) + '.dill'
def get_classifier_filename(method, src_satellite, src_sensor, tgt_satellite, tgt_sensor):
return '__'.join([method, src_satellite, src_sensor]) + \
'__to__' + '__'.join([tgt_satellite, tgt_sensor]) + '.dill'
print("Creating classifier %s..." % fName_cls)
ML = specHomoApproaches[clsApproach](src_cube.data, tgt_cube.data)
ML.dump(os.path.join(outDir, fName_cls))
class RSImage_Predictor(object):
def __init__(self, method='LR', classifier_rootDir=''):
# type (str, str) -> None
self.method = method
self.classifier_rootDir = os.path.abspath(classifier_rootDir)
def get_classifier(self, src_satellite, src_sensor, tgt_satellite, tgt_sensor):
fName_cls = get_classifier_filename(self.method, src_satellite, src_sensor, tgt_satellite, tgt_sensor)
path_cls = os.path.join(self.classifier_rootDir, fName_cls)
class SpecHomo_Classifier(object):
def __init__(self):
pass
if not os.path.isfile(path_cls):
raise FileNotFoundError('No classifier available for the given specification at %s.' % path_cls)
def from_refcubes(self, dir_refcubes):
pass
with open(path_cls, 'rb') as inF:
ML_instance = dill.load(inF)
# validation
exp = specHomoApproaches[self.method]
if not isinstance(ML_instance, exp):
raise ValueError('The given dillFile %s does not contain an instance of %s but %s.'
% (os.path.basename(fName_cls), exp.__name__, type(ML_instance)))
specHomoApproaches = dict(
LR=LinearRegression_RSImage,
RR=RidgeRegression_RSImage
)
return ML_instance
def predict(self, image, src_satellite, src_sensor, tgt_satellite, tgt_sensor, nodataVal=None):
spectra = im2spectra(image)
ML = self.get_classifier(src_satellite, src_sensor, tgt_satellite, tgt_sensor)
spectra_predicted = ML.predict(spectra).astype(image.dtype)
# 2D -> 3D
image_predicted = spectra2im(spectra_predicted, rows=image.shape[0], cols=image.shape[1])
# re-apply nodata values to predicted result
if nodataVal is not None:
im_Y_gA = GeoArray(image, nodata=nodataVal)
image_predicted[im_Y_gA.mask_nodata[:] == 0] = nodataVal
return image_predicted
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment