Commit a8542b88 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Further developed SpecHomo_Classifier to ReferenceCube_Generator. Updated tests.

parent 2736d732
Pipeline #1697 failed with stage
in 7 minutes and 52 seconds
......@@ -12,10 +12,11 @@ import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from pandas import DataFrame
from pandas.plotting import scatter_matrix
from typing import Union, List # noqa F401 # flake8 issue
from typing import Union, List, Dict, Tuple # noqa F401 # flake8 issue
from tqdm import tqdm
from multiprocessing import Pool, cpu_count
from glob import glob
import re
from sklearn.cluster import k_means_ # noqa F401 # flake8 issue
from sklearn.model_selection import train_test_split
......@@ -433,7 +434,7 @@ class RidgeRegression_RSImage(_MachineLearner_RSImage):
test=self.ridgeRegressor.score(self.test_X, self.test_Y))
class SpecHomo_Classifier(object):
class ReferenceCube_Generator_OLD(object):
def __init__(self, filelist_refs, v=False, logger=None, CPUs=None):
# type: (List[str], bool, logging.Logger, Union[None, int]) -> None
"""
......@@ -566,15 +567,27 @@ class SpecHomo_Classifier(object):
return random_samples
class SpecHomo_Classifier2(object):
def __init__(self, filelist_refs, dir_refcubes='', v=False, logger=None, CPUs=None):
# type: (List[str], str, bool, logging.Logger, Union[None, int]) -> None
"""
class ReferenceCube_Generator(object):
"""Class for creating reference cube that are later used as training data for SpecHomo_Classifier."""
:param filelist_refs: list of reference images
def __init__(self, filelist_refs, tgt_sat_sen_list=None, dir_refcubes='', n_clusters=10, tgt_n_samples=1000,
v=False, logger=None, CPUs=None):
# type: (List[str], List[Tuple[str, str]], str, int, int, bool, logging.Logger, Union[None, int]) -> None
"""Initialize ReferenceCube_Generator.
:param filelist_refs: list of (hyperspectral) reference images
:param tgt_sat_sen_list: list satellite/sensor tuples containing those sensors for which the reference cube
is to be computed, e.g. [('Landsat-8', 'OLI_TIRS',), ('Landsat-5', 'TM')]
:param dir_refcubes: output directory for the generated reference cube
:param n_clusters: number of clusters to be used for clustering the input images (KMeans)
:param tgt_n_samples: number o spectra to be collected from each input image
:param v: verbose mode
:param logger: instance of logging.Logger()
:param CPUs: number CPUs to use for computation
"""
# defaults
self.tgt_sat_sen_list = [
# args + kwargs
self.ims_ref = [filelist_refs, ] if isinstance(filelist_refs, str) else filelist_refs
self.tgt_sat_sen_list = tgt_sat_sen_list or [
('Landsat-8', 'OLI_TIRS'),
('Landsat-7', 'ETM+'),
('Landsat-5', 'TM'),
......@@ -586,47 +599,69 @@ class SpecHomo_Classifier2(object):
('SPOT-5', 'HRG2'),
('RapidEye-5', 'MSI')
]
self.tmpdir_multiproc = ''
# privates
self._refcubes = {sat_sen: None for sat_sen in self.tgt_sat_sen_list}
# args + kwargs
self.ims_ref = [filelist_refs, ] if isinstance(filelist_refs, str) else filelist_refs
self.dir_refcubes = os.path.abspath(dir_refcubes)
self.dir_refcubes = os.path.abspath(dir_refcubes) if dir_refcubes else ''
self.n_clusters = n_clusters
self.tgt_n_samples = tgt_n_samples
self.v = v
self.logger = logger or GMS_logger(__name__) # must be pickable
self.CPUs = CPUs or cpu_count()
# privates
self._refcubes = {sat_sen: None for sat_sen in self.tgt_sat_sen_list}
# validation
if not os.path.isdir(self.dir_refcubes):
if dir_refcubes and not os.path.isdir(self.dir_refcubes):
raise ValueError("%s is not a directory." % self.dir_refcubes)
@property
def refcubes(self):
# type: () -> Dict[Tuple[str]: GeoArray]
if not self._refcubes:
# fill self._ref_cubes with GeoArray instances of already existing reference cubes read from disk
if self.dir_refcubes:
for path_refcube in glob(os.path.join(self.dir_refcubes, 'refcube__*.bsq')):
sat_sen = os.path.basename(path_refcube).split('__')[1:3] # type: str
if sat_sen in self.tgt_sat_sen_list:
self._refcubes[sat_sen] = GeoArray(path_refcube)
# check if current refcube path matches the target refcube specifications
identifier = re.search('refcube__(.*).bsq', os.path.basename(path_refcube)).group(1)
sat, sen, nclust_str, nsamp_str = identifier.split('__') # type: str
nclust, nsamp = int(nclust_str.split('nclust')[0]), int(nsamp_str.split('nclust')[0])
correct_specs = all([(sat, sen) in self.tgt_sat_sen_list,
nclust == self.n_clusters,
nsamp == self.tgt_n_samples])
# import the existing ref cube if it matches the target refcube specs
if correct_specs:
self._refcubes[(sat, sen)] = GeoArray(path_refcube)
return self._refcubes
def generate_reference_cubes(self, tgt_sat_sen_list=None, n_clusters=10, tgt_n_samples=1000,
fmt_out='ENVI', progress=True):
self.tgt_sat_sen_list = tgt_sat_sen_list or self.tgt_sat_sen_list
def generate_reference_cubes(self, fmt_out='ENVI', progress=True):
# type: (str, bool) -> self.refcubes
"""Generate reference spectra from all hyperspectral input images.
Workflow:
1. Clustering/classification of hyperspectral images and selection of a given number of random signatures
(a. Spectral downsamling to lower spectral resolution (speedup))
b. KMeans clustering
c. Selection of the same number of signatures from each cluster to avoid unequal amount of training data.
2. Spectral resampling of the selected hyperspectral signatures (for each input image)
3. Add resampled spectra to reference cubes for each target sensor and write cubes to disk
:param fmt_out: output format (GDAL driver code)
:param progress: show progress bar (default: True)
:return: np.array: [tgt_n_samples x images x spectral bands of the target sensor]
"""
for im in self.ims_ref:
src_im = GeoArray(im)
unif_random_spectra = \
self.cluster_image_and_get_uniform_spectra(
src_im, n_clusters=n_clusters, tgt_n_samples=tgt_n_samples, progress=progress)
# get random spectra of the original (hyperspectral) image, equally distributed over all computed clusters
unif_random_spectra = self.cluster_image_and_get_uniform_spectra(src_im, progress=progress)
# resample the set of random spectra to match the spectral characteristics of all target sensors
for tgt_sat, tgt_sen in self.tgt_sat_sen_list:
# perform spectal resampling
self.logger.info('Performing spectral resampling to match %s %s specifications...' % (tgt_sat, tgt_sen))
tgt_srf = SRF(dict(Satellite=tgt_sat, Sensor=tgt_sen, Subsystem=None, image_type='RSD',
proc_level='L1A', logger=self.logger))
......@@ -635,71 +670,57 @@ class SpecHomo_Classifier2(object):
src_cwl=np.array(src_im.meta.loc['wavelength'], dtype=np.float).flatten(),
tgt_srf=tgt_srf)
# add the spectra as GeoArray instance to the in-mem ref cubes
self.add_spectra_to_refcube(unif_random_spectra_rsp,
tgt_sat_sen=(tgt_sat, tgt_sen), src_imname=src_im.basename)
# update the reference cubes on disk
if self.dir_refcubes:
path_out = os.path.join(self.dir_refcubes, 'refcube__%s__%s__nclust%s__nsamp%s.bsq'
% (tgt_sat, tgt_sen, n_clusters, tgt_n_samples))
updated_refcube = self.refcubes[(tgt_sat, tgt_sen)] # type: GeoArray
% (tgt_sat, tgt_sen, self.n_clusters, self.tgt_n_samples))
updated_refcube = self.refcubes[(tgt_sat, tgt_sen)] # type: GeoArray
updated_refcube.save(out_path=path_out, fmt=fmt_out)
return self.refcubes
def _classify(self, im, n_clusters, progress, spec_rsp_sat='Sentinel-2A', spec_rsp_sen='MSI'):
# input validation
if spec_rsp_sat and not spec_rsp_sen or spec_rsp_sen and not spec_rsp_sat:
raise ValueError("The parameters 'spec_rsp_sat' and 'spec_rsp_sen' must both be provided or completely "
"omitted.")
im2clust = GeoArray(im)
# first, perform spectral resampling to Sentinel-2 to reduce dimensionality
if spec_rsp_sat and spec_rsp_sen:
tgt_srf = SRF(dict(Satellite=spec_rsp_sat, Sensor=spec_rsp_sen, Subsystem=None, image_type='RSD',
proc_level='L1A', logger=self.logger))
im2clust = self.resample_spectra(im2clust, tgt_srf, progress=progress)
kmeans = KMeansRSImage(im2clust, n_clusters=n_clusters, CPUs=self.CPUs)
return kmeans.im_clust
def cluster_image_and_get_uniform_spectra(self, im, n_clusters, tgt_n_samples,
spec_rsp_sat='Sentinel-2A', spec_rsp_sen='MSI', progress=False):
# type: (Union[str, GeoArray, np.ndarray], int, int) -> np.ndarray
def cluster_image_and_get_uniform_spectra(self, im, downsamp_sat='Sentinel-2A', downsamp_sen='MSI', progress=False):
# type: (Union[str, GeoArray, np.ndarray], str, str, bool) -> np.ndarray
"""Compute KMeans clusters for the given image and return the an array of uniform random samples.
:param im: image to be clustered
:param n_clusters: number of clusters to use
:param tgt_n_samples: number of returned random samples
:param downsamp_sat: satellite code used for intermediate image dimensionality reduction (input image is
spectrally resampled to this satellite before it is clustered). requires downsamp_sen.
If it is None, no intermediate downsampling is performed.
:param downsamp_sen: sensor code used for intermediate image dimensionality reduction (required downsamp_sat)
:param progress: whether tp show progress bars or not
:return: 2D array (rows: tgt_n_samples, columns: spectral information / bands
"""
# input validation
if spec_rsp_sat and not spec_rsp_sen or spec_rsp_sen and not spec_rsp_sat:
if downsamp_sat and not downsamp_sen or downsamp_sen and not downsamp_sat:
raise ValueError("The parameters 'spec_rsp_sat' and 'spec_rsp_sen' must both be provided or completely "
"omitted.")
im2clust = GeoArray(im)
# first, perform spectral resampling to Sentinel-2 to reduce dimensionality
if spec_rsp_sat and spec_rsp_sen:
tgt_srf = SRF(dict(Satellite=spec_rsp_sat, Sensor=spec_rsp_sen, Subsystem=None, image_type='RSD',
if downsamp_sat and downsamp_sen:
tgt_srf = SRF(dict(Satellite=downsamp_sat, Sensor=downsamp_sen, Subsystem=None, image_type='RSD',
proc_level='L1A', logger=self.logger))
im2clust = self.resample_image_spectrally(im2clust, tgt_srf, progress=progress)
# compute KMeans clusters for the spectrally resampled image
self.logger.info('Computing %s KMeans clusters from the input image %s...' % (n_clusters, im2clust.basename))
kmeans = KMeansRSImage(im2clust, n_clusters=n_clusters, CPUs=self.CPUs, v=self.v)
self.logger.info('Computing %s KMeans clusters from the input image %s...'
% (self.n_clusters, im2clust.basename))
kmeans = KMeansRSImage(im2clust, n_clusters=self.n_clusters, CPUs=self.CPUs, v=self.v)
if self.v:
kmeans.plot_cluster_centers()
kmeans.plot_cluster_histogram()
# randomly grab the given number of spectra from each cluster
self.logger.info('Getting %s random spectra from each cluster...' % (tgt_n_samples // n_clusters))
random_samples = \
kmeans.get_random_spectra_from_each_cluster(src_im=GeoArray(im), samplesize=tgt_n_samples // n_clusters)
self.logger.info('Getting %s random spectra from each cluster...' % (self.tgt_n_samples // self.n_clusters))
random_samples = kmeans.get_random_spectra_from_each_cluster(src_im=GeoArray(im),
samplesize=self.tgt_n_samples // self.n_clusters)
# combine the spectra (2D arrays) of all clusters to a single 2D array
self.logger.info('Combining random samples from all clusters.')
......@@ -759,13 +780,23 @@ class SpecHomo_Classifier2(object):
return tgt_im
def add_spectra_to_refcube(self, spectra, tgt_sat_sen, src_imname):
# type: (np.ndarray, tuple, str) -> None
"""Add a set of spectral signatures to the reference cube (in-mem variable self.refcubes).
:param spectra: 2D numpy array with rows: spectral samples / columns: spectral information (bands)
:param tgt_sat_sen: tuple of ('satellite', 'sensor')
:param src_imname: image basename of the source hyperspectral image
"""
# reshape 2D spectra array to one image column (refcube is an image with spectral information in the 3rd dim.)
im_col = spectra.reshape(spectra.shape[0], 1, spectra.shape[1])
if self.refcubes[tgt_sat_sen] is not None:
# append spectra to existing reference cube
new_cube = np.hstack([self.refcubes[tgt_sat_sen], im_col])
self.refcubes[tgt_sat_sen] = GeoArray(new_cube)
# TODO add src_imname to GeoArray metadata
else:
# add a new GeoArray instance containing given spectra as a single image line
self.refcubes[tgt_sat_sen] = GeoArray(im_col)
......@@ -10,26 +10,27 @@ Tests for gms_preprocessing.algorithms.L2B_P.SpecHomo_Classifier
import unittest
import os
import tempfile
import numpy as np
from geoarray import GeoArray
from gms_preprocessing import __path__ # noqa E402 module level import not at top of file
from gms_preprocessing import set_config # noqa E402 module level import not at top of file
from gms_preprocessing.algorithms.L2B_P import SpecHomo_Classifier # noqa E402 module level import not at top of file
from gms_preprocessing.algorithms.L2B_P import SpecHomo_Classifier2 # noqa E402 module level import not at top of file
from gms_preprocessing.algorithms.L2B_P import ReferenceCube_Generator_OLD # noqa E402 module level import not at top of file
from gms_preprocessing.algorithms.L2B_P import ReferenceCube_Generator # noqa E402 module level import not at top of file
testdata = os.path.join(__path__[0], '../tests/data/hy_spec_data/Bavaria_farmland_LMU_Hyspex_subset.bsq')
class Test_SpecHomo_Classifier(unittest.TestCase):
class Test_ReferenceCube_Generator_OLD(unittest.TestCase):
"""Tests class for gms_preprocessing.algorithms.L2B_P.SpecHomo_Classifier"""
@classmethod
def setUpClass(cls):
# Testjob Landsat-8
set_config(exec_mode='Python', job_ID=26186196, db_host='geoms', reset_status=True, is_test=True)
cls.SHC = SpecHomo_Classifier([testdata, testdata, ], v=False)
cls.SHC = ReferenceCube_Generator_OLD([testdata, testdata, ], v=False)
def test_generate_reference_cube_L8(self):
ref_cube = self.SHC.generate_reference_cube('Landsat-8', 'OLI_TIRS', n_clusters=10, tgt_n_samples=1000)
......@@ -47,39 +48,56 @@ class Test_SpecHomo_Classifier(unittest.TestCase):
self.assertIsInstance(ref_cube, np.ndarray)
def test_multiprocessing(self):
SHC = SpecHomo_Classifier([testdata, testdata, ], v=False, CPUs=1)
SHC = ReferenceCube_Generator_OLD([testdata, testdata, ], v=False, CPUs=1)
ref_cube_sp = SHC.generate_reference_cube('Landsat-8', 'OLI_TIRS', n_clusters=10, tgt_n_samples=1000)
SHC = SpecHomo_Classifier([testdata, testdata, ], v=False, CPUs=None)
SHC = ReferenceCube_Generator_OLD([testdata, testdata, ], v=False, CPUs=None)
ref_cube_mp = SHC.generate_reference_cube('Landsat-8', 'OLI_TIRS', n_clusters=10, tgt_n_samples=1000)
self.assertTrue(np.any(ref_cube_sp), msg='Singleprocessing result is empty.')
self.assertTrue(np.any(ref_cube_mp), msg='Multiprocessing result is empty.')
class Test_SpecHomo_Classifier2(unittest.TestCase):
class Test_ReferenceCube_Generator(unittest.TestCase):
"""Tests class for gms_preprocessing.algorithms.L2B_P.SpecHomo_Classifier"""
@classmethod
def setUpClass(cls):
# Testjob Landsat-8
set_config(exec_mode='Python', job_ID=26186196, db_host='geoms', reset_status=True, is_test=True)
cls.SHC = SpecHomo_Classifier2([testdata, testdata, ], v=False)
cls.tmpOutdir = tempfile.TemporaryDirectory()
cls.testIms = [testdata, testdata, ]
cls.tgt_sat_sen_list = [
('Landsat-8', 'OLI_TIRS'),
('Landsat-7', 'ETM+'),
('Landsat-5', 'TM'),
('Sentinel-2A', 'MSI'),
# ('Terra', 'ASTER'), # currently does not work
('SPOT-4', 'HRVIR1'),
('SPOT-4', 'HRVIR2'),
('SPOT-5', 'HRG1'),
('SPOT-5', 'HRG2'),
('RapidEye-5', 'MSI')
]
cls.n_clusters = 5
cls.tgt_n_samples = 500
cls.SHC = ReferenceCube_Generator(cls.testIms, dir_refcubes=cls.tmpOutdir.name,
tgt_sat_sen_list=cls.tgt_sat_sen_list,
n_clusters=cls.n_clusters, tgt_n_samples=cls.tgt_n_samples, v=False)
def test__classify(self):
classification = self.SHC._classify(self.SHC.ims_ref[0], n_clusters=10, progress=True)
self.assertIsInstance(classification, np.ndarray)
self.assertEqual(classification.ndim, 2)
@classmethod
def tearDownClass(cls):
cls.tmpOutdir.cleanup()
def test_cluster_image_and_get_uniform_samples(self):
src_im = self.SHC.ims_ref[0]
unif_random_spectra = self.SHC.cluster_image_and_get_uniform_spectra(src_im, n_clusters=10, tgt_n_samples=1000)
unif_random_spectra = self.SHC.cluster_image_and_get_uniform_spectra(src_im)
self.assertIsInstance(unif_random_spectra, np.ndarray)
self.assertTrue(unif_random_spectra.shape == (1000, GeoArray(src_im).bands))
self.assertEqual(unif_random_spectra.shape, (self.tgt_n_samples, GeoArray(src_im).bands))
def test_resample_spectra(self):
src_im = GeoArray(self.SHC.ims_ref[0])
unif_random_spectra = self.SHC.cluster_image_and_get_uniform_spectra(src_im, n_clusters=10, tgt_n_samples=1000)
unif_random_spectra = self.SHC.cluster_image_and_get_uniform_spectra(src_im)
from gms_preprocessing.io.input_reader import SRF
tgt_srf = SRF(dict(Satellite='Sentinel-2A', Sensor='MSI', Subsystem=None, image_type='RSD',
......@@ -89,36 +107,20 @@ class Test_SpecHomo_Classifier2(unittest.TestCase):
src_cwl=np.array(src_im.meta.loc['wavelength'], dtype=np.float).flatten(),
tgt_srf=tgt_srf)
self.assertIsInstance(unif_random_spectra_rsp, np.ndarray)
self.assertEqual(unif_random_spectra_rsp.shape, (1000, len(tgt_srf.bands)))
pass
self.assertEqual(unif_random_spectra_rsp.shape, (self.tgt_n_samples, len(tgt_srf.bands)))
def test_generate_reference_cube(self):
refcubes = self.SHC.generate_reference_cubes()
pass
@unittest.SkipTest
def test_generate_reference_cube_L8(self):
ref_cube = self.SHC.generate_reference_cubes('Landsat-8', 'OLI_TIRS', n_clusters=10, tgt_n_samples=1000)
self.assertIsInstance(ref_cube, np.ndarray)
self.assertTrue(np.any(ref_cube), msg='Reference cube for Landsat-8 is empty.')
@unittest.SkipTest
def test_generate_reference_cube_S2A(self):
ref_cube = self.SHC.generate_reference_cubes('Sentinel-2A', 'MSI', n_clusters=10, tgt_n_samples=1000)
self.assertIsInstance(ref_cube, np.ndarray)
self.assertTrue(np.any(ref_cube), msg='Reference cube for Sentinel-2A is empty.')
@unittest.SkipTest
def test_generate_reference_cube_AST(self):
ref_cube = self.SHC.generate_reference_cubes('Terra', 'ASTER', n_clusters=10, tgt_n_samples=1000)
self.assertIsInstance(ref_cube, np.ndarray)
self.assertIsInstance(refcubes, dict)
self.assertIsInstance(refcubes[('Landsat-8', 'OLI_TIRS')], GeoArray)
self.assertEqual(refcubes[('Landsat-8', 'OLI_TIRS')].shape, (self.tgt_n_samples, len(self.testIms), 8))
@unittest.SkipTest
def test_multiprocessing(self):
SHC = SpecHomo_Classifier([testdata, testdata, ], v=False, CPUs=1)
SHC = ReferenceCube_Generator_OLD([testdata, testdata, ], v=False, CPUs=1)
ref_cube_sp = SHC.generate_reference_cube('Landsat-8', 'OLI_TIRS', n_clusters=10, tgt_n_samples=1000)
SHC = SpecHomo_Classifier([testdata, testdata, ], v=False, CPUs=None)
SHC = ReferenceCube_Generator_OLD([testdata, testdata, ], v=False, CPUs=None)
ref_cube_mp = SHC.generate_reference_cube('Landsat-8', 'OLI_TIRS', n_clusters=10, tgt_n_samples=1000)
self.assertTrue(np.any(ref_cube_sp), msg='Singleprocessing result is empty.')
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment