Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
geomultisens
gms_preprocessing
Commits
2736d732
Commit
2736d732
authored
Dec 07, 2017
by
Daniel Scheffler
Browse files
Added first version of revised SpecHomo_Classifier. Bugfix for SRF class. Added tests.
parent
6f07aa50
Pipeline
#1695
failed with stage
in 8 minutes and 28 seconds
Changes
3
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
gms_preprocessing/algorithms/L2B_P.py
View file @
2736d732
...
...
@@ -15,6 +15,7 @@ from pandas.plotting import scatter_matrix
from
typing
import
Union
,
List
# noqa F401 # flake8 issue
from
tqdm
import
tqdm
from
multiprocessing
import
Pool
,
cpu_count
from
glob
import
glob
from
sklearn.cluster
import
k_means_
# noqa F401 # flake8 issue
from
sklearn.model_selection
import
train_test_split
...
...
@@ -155,12 +156,35 @@ class SpectralResampler(object):
return
np
.
array
(
spectrum_rsp
)
def
resample_spectra
(
self
,
spectra
,
chunksize
=
200
,
CPUs
=
None
):
# type: (Union[GeoArray, np.ndarray], int) -> np.ndarray
"""Resample the given spectral signatures according to the spectral response functions of the target instrument.
:param spectra: spectral signatures, provided as 2D array
(rows: spectral samples, columns: spectral information / bands)
:param chunksize: defines how many spectral signatures are resampled per CPU
:param CPUs: CPUs to use for processing
"""
# input validation
if
not
spectra
.
ndim
==
2
:
ValueError
(
"The given spectra array must be 2-dimensional. Received a %s-dimensional array."
%
spectra
.
ndim
)
assert
spectra
.
shape
[
1
]
==
self
.
wvl_src_nm
.
size
# convert spectra to one multispectral image column
im_col
=
spectra
.
reshape
(
spectra
.
shape
[
0
],
1
,
spectra
.
shape
[
1
])
im_col_rsp
=
self
.
resample_image
(
im_col
,
tiledims
=
(
1
,
chunksize
),
CPUs
=
CPUs
)
spectra_rsp
=
im_col_rsp
.
reshape
(
im_col_rsp
.
shape
[
0
],
im_col_rsp
.
shape
[
2
])
return
spectra_rsp
def
resample_image
(
self
,
image_cube
,
tiledims
=
(
20
,
20
),
CPUs
=
None
):
# type: (Union[GeoArray, np.ndarray], tuple) -> np.ndarray
"""Resample the given spectral image cube according to the spectral response functions of the target instrument.
:param image_cube: image (3D array) containing the spectral information in the third dimension
:param tiledims: dimension of tiles to be used during computation
:param tiledims: dimension of tiles to be used during computation
(rows, columns)
:param CPUs: CPUs to use for processing
:return: resampled spectral image cube
"""
...
...
@@ -201,7 +225,7 @@ class SpectralResampler(object):
class
KMeansRSImage
(
object
):
def
__init__
(
self
,
im
,
n_clusters
,
CPUs
=
1
):
def
__init__
(
self
,
im
,
n_clusters
,
CPUs
=
1
,
v
=
False
):
# type: (GeoArray, int) -> None
# privates
...
...
@@ -212,6 +236,7 @@ class KMeansRSImage(object):
self
.
im
=
im
self
.
n_clusters
=
n_clusters
self
.
CPUs
=
CPUs
self
.
v
=
v
@
property
def
clusters
(
self
):
...
...
@@ -231,7 +256,7 @@ class KMeansRSImage(object):
return
self
.
_im_clust
def
compute_clusters
(
self
):
kmeans
=
KMeans
(
n_clusters
=
self
.
n_clusters
,
random_state
=
0
,
n_jobs
=
self
.
CPUs
)
kmeans
=
KMeans
(
n_clusters
=
self
.
n_clusters
,
random_state
=
0
,
n_jobs
=
self
.
CPUs
,
verbose
=
self
.
v
)
self
.
clusters
=
kmeans
.
fit
(
self
.
_im2spectra
(
self
.
im
))
return
self
.
clusters
...
...
@@ -298,7 +323,7 @@ class KMeansRSImage(object):
plt
.
imshow
(
self
.
im_clust
,
plt
.
get_cmap
(
'prism'
),
interpolation
=
'none'
,
extent
=
(
0
,
cols
,
rows
,
0
))
plt
.
show
()
def
get_random_spectra_from_each_cluster
(
self
,
samplesize
=
50
):
def
get_random_spectra_from_each_cluster
(
self
,
samplesize
=
50
,
src_im
=
None
):
# type: (int) -> dict
"""Returns a given number of spectra randomly selected within each cluster.
...
...
@@ -307,13 +332,13 @@ class KMeansRSImage(object):
:param samplesize: number of spectra to be randomly selected from each cluster
:return:
"""
src_im
=
src_im
if
src_im
is
not
None
else
self
.
im
# get DataFrame with columns [cluster_label, B1, B2, B3, ...]
df
=
DataFrame
(
self
.
_im2spectra
(
s
elf
.
im
),
columns
=
[
'B%s'
%
band
for
band
in
range
(
1
,
s
elf
.
im
.
bands
+
1
)],
)
df
=
DataFrame
(
self
.
_im2spectra
(
s
rc_
im
),
columns
=
[
'B%s'
%
band
for
band
in
range
(
1
,
s
rc_
im
.
bands
+
1
)],
)
df
.
insert
(
0
,
'cluster_label'
,
self
.
clusters
.
labels_
)
# get random sample from each cluster and generate a dict like {cluster_label: random_sample}
print
(
'Generating random samples from clusters.'
)
random_samples
=
dict
()
for
label
in
range
(
self
.
n_clusters
):
cluster_subset
=
df
[
df
.
cluster_label
==
label
].
loc
[:,
'B1'
:]
...
...
@@ -340,6 +365,7 @@ class _MachineLearner_RSImage(object):
@
staticmethod
def
_im2spectra
(
geoArr
):
"""Convert images to array of spectra samples (rows: samples; cols: spectral information)."""
return
geoArr
.
reshape
((
geoArr
.
rows
*
geoArr
.
cols
,
geoArr
.
bands
))
def
plot_scatter_matrix
(
self
,
figsize
=
(
15
,
15
)):
...
...
@@ -348,11 +374,11 @@ class _MachineLearner_RSImage(object):
df
=
DataFrame
(
train_X
,
columns
=
[
'Band %s'
%
b
for
b
in
range
(
1
,
self
.
im_X
.
bands
+
1
)])
scatter_matrix
(
df
,
figsize
=
figsize
,
marker
=
'.'
,
hist_kwds
=
{
'bins'
:
50
},
s
=
30
,
alpha
=
0.8
)
plt
.
suptitle
(
'Image X'
)
plt
.
suptitle
(
'Image X
band to band correlation
'
)
df
=
DataFrame
(
train_Y
,
columns
=
[
'Band %s'
%
b
for
b
in
range
(
1
,
self
.
im_Y
.
bands
+
1
)])
scatter_matrix
(
df
,
figsize
=
figsize
,
marker
=
'.'
,
hist_kwds
=
{
'bins'
:
50
},
s
=
30
,
alpha
=
0.8
)
plt
.
suptitle
(
'Image Y'
)
plt
.
suptitle
(
'Image Y
band to band correlation
'
)
class
LinearRegression_RSImage
(
_MachineLearner_RSImage
):
...
...
@@ -450,8 +476,8 @@ class SpecHomo_Classifier(object):
if
self
.
v
:
tgt_srf
.
plot_srfs
()
# Build the reference cube from random samples of each image
# => rows: tgt_n_samples, columns: images, bands: spectral information
# Build the
3D
reference cube from random samples of each image
# => rows: tgt_n_samples,
columns: images, bands:
spectral information
# generate random spectra samples equally for each KMeans cluster
self
.
ref_cube
=
np
.
zeros
((
tgt_n_samples
,
len
(
self
.
ims_ref
),
len
(
tgt_srf
.
bands
)))
...
...
@@ -462,8 +488,11 @@ class SpecHomo_Classifier(object):
self
.
logger
.
info
(
'Generating random samples for %s (shape: %s)'
%
(
os
.
path
.
basename
(
im
),
GeoArray
(
im
).
shape
))
im_rsp
=
self
.
perform_spectral_resampling
(
im
,
tgt_srf
,
progress
=
progress
)
random_samples
=
self
.
cluster_image_and_get_uniform_samples
(
im_rsp
,
n_clusters
,
tgt_n_samples
)
im_rsp
=
\
self
.
perform_spectral_resampling
(
im
,
tgt_srf
,
progress
=
progress
)
random_samples
=
\
self
.
cluster_image_and_get_uniform_samples
(
im_rsp
,
n_clusters
,
tgt_n_samples
)
self
.
logger
.
info
(
'Adding random samples of %s to reference cube...'
%
os
.
path
.
basename
(
self
.
ims_ref
[
im_num
]))
self
.
ref_cube
[:,
im_num
,
:]
=
random_samples
...
...
@@ -478,7 +507,7 @@ class SpecHomo_Classifier(object):
return
self
.
ref_cube
def
perform_spectral_resampling
(
self
,
src_im
,
tgt_srf
,
progress
=
False
):
# type: (
str
, SRF, bool) -> Union[GeoArray, None]
# type: (
Union[str, GeoArray]
, SRF, bool) -> Union[GeoArray, None]
"""Perform spectral resampling of the given image to match the given spectral response functions.
:param src_im: source image to be resampled
...
...
@@ -486,11 +515,16 @@ class SpecHomo_Classifier(object):
:param progress: show progress bar (default: false)
:return:
"""
im_name
=
os
.
path
.
basename
(
src_im
)
# handle src_im provided as file path or GeoArray instance
if
isinstance
(
src_im
,
str
):
im_name
=
os
.
path
.
basename
(
src_im
)
im_gA
=
GeoArray
(
src_im
)
else
:
im_name
=
src_im
.
basename
im_gA
=
src_im
# read input image
self
.
logger
.
info
(
'Reading the input image %s...'
%
im_name
)
im_gA
=
GeoArray
(
src_im
)
im_gA
.
cwl
=
np
.
array
(
im_gA
.
meta
.
loc
[
'wavelength'
],
dtype
=
np
.
float
).
flatten
()
# perform spectral resampling of input image to match spectral properties of target sensor
...
...
@@ -511,7 +545,7 @@ class SpecHomo_Classifier(object):
:param im: image to be clustered
:param n_clusters: number of clusters to use
:param tgt_n_samples: number of returned random samples
:return:
:return:
2D array (rows: tgt_n_samples, columns: spectral information / bands
"""
# compute KMeans clusters for the spectrally resampled image
self
.
logger
.
info
(
'Computing %s KMeans clusters...'
%
n_clusters
)
...
...
@@ -530,3 +564,208 @@ class SpecHomo_Classifier(object):
random_samples
=
np
.
vstack
([
random_samples
[
clusterlabel
]
for
clusterlabel
in
random_samples
])
return
random_samples
class
SpecHomo_Classifier2
(
object
):
def
__init__
(
self
,
filelist_refs
,
dir_refcubes
=
''
,
v
=
False
,
logger
=
None
,
CPUs
=
None
):
# type: (List[str], str, bool, logging.Logger, Union[None, int]) -> None
"""
:param filelist_refs: list of reference images
"""
# defaults
self
.
tgt_sat_sen_list
=
[
(
'Landsat-8'
,
'OLI_TIRS'
),
(
'Landsat-7'
,
'ETM+'
),
(
'Landsat-5'
,
'TM'
),
(
'Sentinel-2A'
,
'MSI'
),
# ('Terra', 'ASTER'), # currently does not work
(
'SPOT-4'
,
'HRVIR1'
),
(
'SPOT-4'
,
'HRVIR2'
),
(
'SPOT-5'
,
'HRG1'
),
(
'SPOT-5'
,
'HRG2'
),
(
'RapidEye-5'
,
'MSI'
)
]
self
.
tmpdir_multiproc
=
''
# privates
self
.
_refcubes
=
{
sat_sen
:
None
for
sat_sen
in
self
.
tgt_sat_sen_list
}
# args + kwargs
self
.
ims_ref
=
[
filelist_refs
,
]
if
isinstance
(
filelist_refs
,
str
)
else
filelist_refs
self
.
dir_refcubes
=
os
.
path
.
abspath
(
dir_refcubes
)
self
.
v
=
v
self
.
logger
=
logger
or
GMS_logger
(
__name__
)
# must be pickable
self
.
CPUs
=
CPUs
or
cpu_count
()
# validation
if
not
os
.
path
.
isdir
(
self
.
dir_refcubes
):
raise
ValueError
(
"%s is not a directory."
%
self
.
dir_refcubes
)
@
property
def
refcubes
(
self
):
if
not
self
.
_refcubes
:
# fill self._ref_cubes with GeoArray instances of already existing reference cubes read from disk
if
self
.
dir_refcubes
:
for
path_refcube
in
glob
(
os
.
path
.
join
(
self
.
dir_refcubes
,
'refcube__*.bsq'
)):
sat_sen
=
os
.
path
.
basename
(
path_refcube
).
split
(
'__'
)[
1
:
3
]
# type: str
if
sat_sen
in
self
.
tgt_sat_sen_list
:
self
.
_refcubes
[
sat_sen
]
=
GeoArray
(
path_refcube
)
return
self
.
_refcubes
def
generate_reference_cubes
(
self
,
tgt_sat_sen_list
=
None
,
n_clusters
=
10
,
tgt_n_samples
=
1000
,
fmt_out
=
'ENVI'
,
progress
=
True
):
self
.
tgt_sat_sen_list
=
tgt_sat_sen_list
or
self
.
tgt_sat_sen_list
for
im
in
self
.
ims_ref
:
src_im
=
GeoArray
(
im
)
unif_random_spectra
=
\
self
.
cluster_image_and_get_uniform_spectra
(
src_im
,
n_clusters
=
n_clusters
,
tgt_n_samples
=
tgt_n_samples
,
progress
=
progress
)
for
tgt_sat
,
tgt_sen
in
self
.
tgt_sat_sen_list
:
self
.
logger
.
info
(
'Performing spectral resampling to match %s %s specifications...'
%
(
tgt_sat
,
tgt_sen
))
tgt_srf
=
SRF
(
dict
(
Satellite
=
tgt_sat
,
Sensor
=
tgt_sen
,
Subsystem
=
None
,
image_type
=
'RSD'
,
proc_level
=
'L1A'
,
logger
=
self
.
logger
))
unif_random_spectra_rsp
=
\
self
.
resample_spectra
(
unif_random_spectra
,
src_cwl
=
np
.
array
(
src_im
.
meta
.
loc
[
'wavelength'
],
dtype
=
np
.
float
).
flatten
(),
tgt_srf
=
tgt_srf
)
self
.
add_spectra_to_refcube
(
unif_random_spectra_rsp
,
tgt_sat_sen
=
(
tgt_sat
,
tgt_sen
),
src_imname
=
src_im
.
basename
)
if
self
.
dir_refcubes
:
path_out
=
os
.
path
.
join
(
self
.
dir_refcubes
,
'refcube__%s__%s__nclust%s__nsamp%s.bsq'
%
(
tgt_sat
,
tgt_sen
,
n_clusters
,
tgt_n_samples
))
updated_refcube
=
self
.
refcubes
[(
tgt_sat
,
tgt_sen
)]
# type: GeoArray
updated_refcube
.
save
(
out_path
=
path_out
,
fmt
=
fmt_out
)
return
self
.
refcubes
def
_classify
(
self
,
im
,
n_clusters
,
progress
,
spec_rsp_sat
=
'Sentinel-2A'
,
spec_rsp_sen
=
'MSI'
):
# input validation
if
spec_rsp_sat
and
not
spec_rsp_sen
or
spec_rsp_sen
and
not
spec_rsp_sat
:
raise
ValueError
(
"The parameters 'spec_rsp_sat' and 'spec_rsp_sen' must both be provided or completely "
"omitted."
)
im2clust
=
GeoArray
(
im
)
# first, perform spectral resampling to Sentinel-2 to reduce dimensionality
if
spec_rsp_sat
and
spec_rsp_sen
:
tgt_srf
=
SRF
(
dict
(
Satellite
=
spec_rsp_sat
,
Sensor
=
spec_rsp_sen
,
Subsystem
=
None
,
image_type
=
'RSD'
,
proc_level
=
'L1A'
,
logger
=
self
.
logger
))
im2clust
=
self
.
resample_spectra
(
im2clust
,
tgt_srf
,
progress
=
progress
)
kmeans
=
KMeansRSImage
(
im2clust
,
n_clusters
=
n_clusters
,
CPUs
=
self
.
CPUs
)
return
kmeans
.
im_clust
def
cluster_image_and_get_uniform_spectra
(
self
,
im
,
n_clusters
,
tgt_n_samples
,
spec_rsp_sat
=
'Sentinel-2A'
,
spec_rsp_sen
=
'MSI'
,
progress
=
False
):
# type: (Union[str, GeoArray, np.ndarray], int, int) -> np.ndarray
"""Compute KMeans clusters for the given image and return the an array of uniform random samples.
:param im: image to be clustered
:param n_clusters: number of clusters to use
:param tgt_n_samples: number of returned random samples
:return: 2D array (rows: tgt_n_samples, columns: spectral information / bands
"""
# input validation
if
spec_rsp_sat
and
not
spec_rsp_sen
or
spec_rsp_sen
and
not
spec_rsp_sat
:
raise
ValueError
(
"The parameters 'spec_rsp_sat' and 'spec_rsp_sen' must both be provided or completely "
"omitted."
)
im2clust
=
GeoArray
(
im
)
# first, perform spectral resampling to Sentinel-2 to reduce dimensionality
if
spec_rsp_sat
and
spec_rsp_sen
:
tgt_srf
=
SRF
(
dict
(
Satellite
=
spec_rsp_sat
,
Sensor
=
spec_rsp_sen
,
Subsystem
=
None
,
image_type
=
'RSD'
,
proc_level
=
'L1A'
,
logger
=
self
.
logger
))
im2clust
=
self
.
resample_image_spectrally
(
im2clust
,
tgt_srf
,
progress
=
progress
)
# compute KMeans clusters for the spectrally resampled image
self
.
logger
.
info
(
'Computing %s KMeans clusters from the input image %s...'
%
(
n_clusters
,
im2clust
.
basename
))
kmeans
=
KMeansRSImage
(
im2clust
,
n_clusters
=
n_clusters
,
CPUs
=
self
.
CPUs
,
v
=
self
.
v
)
if
self
.
v
:
kmeans
.
plot_cluster_centers
()
kmeans
.
plot_cluster_histogram
()
# randomly grab the given number of spectra from each cluster
self
.
logger
.
info
(
'Getting %s random spectra from each cluster...'
%
(
tgt_n_samples
//
n_clusters
))
random_samples
=
\
kmeans
.
get_random_spectra_from_each_cluster
(
src_im
=
GeoArray
(
im
),
samplesize
=
tgt_n_samples
//
n_clusters
)
# combine the spectra (2D arrays) of all clusters to a single 2D array
self
.
logger
.
info
(
'Combining random samples from all clusters.'
)
random_samples
=
np
.
vstack
([
random_samples
[
clusterlabel
]
for
clusterlabel
in
random_samples
])
return
random_samples
def
resample_spectra
(
self
,
spectra
,
src_cwl
,
tgt_srf
):
# type: (Union[GeoArray, np.ndarray], Union[list, np.array], SRF, bool) -> np.ndarray
"""Perform spectral resampling of the given image to match the given spectral response functions.
:param spectra: 2D array (rows: spectral samples; columns: spectral information / bands
:param src_cwl: central wavelength positions of input spectra
:param tgt_srf: target spectral response functions to be used for spectral resampling
:return:
"""
spectra
=
GeoArray
(
spectra
)
# perform spectral resampling of input image to match spectral properties of target sensor
self
.
logger
.
info
(
'Performing spectral resampling to match spectral properties of target sensor...'
)
SR
=
SpectralResampler
(
src_cwl
,
tgt_srf
)
spectra_rsp
=
SR
.
resample_spectra
(
spectra
,
chunksize
=
200
,
CPUs
=
self
.
CPUs
)
return
spectra_rsp
def
resample_image_spectrally
(
self
,
src_im
,
tgt_srf
,
progress
=
False
):
# type: (Union[str, GeoArray], SRF, bool) -> Union[GeoArray, None]
"""Perform spectral resampling of the given image to match the given spectral response functions.
:param src_im: source image to be resampled
:param tgt_srf: target spectral response functions to be used for spectral resampling
:param progress: show progress bar (default: false)
:return:
"""
# handle src_im provided as file path or GeoArray instance
if
isinstance
(
src_im
,
str
):
im_name
=
os
.
path
.
basename
(
src_im
)
im_gA
=
GeoArray
(
src_im
)
else
:
im_name
=
src_im
.
basename
im_gA
=
src_im
# read input image
self
.
logger
.
info
(
'Reading the input image %s...'
%
im_name
)
im_gA
.
cwl
=
np
.
array
(
im_gA
.
meta
.
loc
[
'wavelength'
],
dtype
=
np
.
float
).
flatten
()
# perform spectral resampling of input image to match spectral properties of target sensor
self
.
logger
.
info
(
'Performing spectral resampling to match spectral properties of target sensor...'
)
SR
=
SpectralResampler
(
im_gA
.
cwl
,
tgt_srf
)
tgt_im
=
GeoArray
(
np
.
zeros
((
*
im_gA
.
shape
[:
2
],
len
(
tgt_srf
.
bands
)),
dtype
=
np
.
int16
),
im_gA
.
gt
,
im_gA
.
prj
)
tiles
=
im_gA
.
tiles
((
1000
,
1000
))
# use tiles to save memory
for
((
rS
,
rE
),
(
cS
,
cE
)),
tiledata
in
(
tqdm
(
tiles
)
if
progress
else
tiles
):
tgt_im
[
rS
:
rE
+
1
,
cS
:
cE
+
1
,
:]
=
SR
.
resample_image
(
tiledata
.
astype
(
np
.
int16
),
CPUs
=
self
.
CPUs
)
return
tgt_im
def
add_spectra_to_refcube
(
self
,
spectra
,
tgt_sat_sen
,
src_imname
):
im_col
=
spectra
.
reshape
(
spectra
.
shape
[
0
],
1
,
spectra
.
shape
[
1
])
if
self
.
refcubes
[
tgt_sat_sen
]
is
not
None
:
new_cube
=
np
.
hstack
([
self
.
refcubes
[
tgt_sat_sen
],
im_col
])
self
.
refcubes
[
tgt_sat_sen
]
=
GeoArray
(
new_cube
)
# TODO add src_imname to GeoArray metadata
else
:
self
.
refcubes
[
tgt_sat_sen
]
=
GeoArray
(
im_col
)
gms_preprocessing/io/input_reader.py
View file @
2736d732
...
...
@@ -170,6 +170,7 @@ def get_list_GMSfiles(dataset_list, target):
def
SRF_reader
(
GMS_identifier
,
v
=
False
):
# type: (dict, bool) -> collections.OrderedDict
"""Read SRF for any sensor and return a dictionary containing band names as keys and SRF numpy arrays as values.
:param GMS_identifier:
...
...
@@ -231,7 +232,7 @@ class SRF(object):
self
.
from_GMS_identifier
(
GMS_identifier
)
def
from_GMS_identifier
(
self
,
GMS_identifier
):
srf_dict
=
SRF_reader
(
GMS_identifier
,
v
=
self
.
v
)
srf_dict
=
SRF_reader
(
GMS_identifier
,
v
=
self
.
v
)
# type: collections.OrderedDict # (ordered according to LBA)
return
self
.
from_dict
(
srf_dict
)
def
from_dict
(
self
,
srf_dict
):
...
...
@@ -274,7 +275,7 @@ class SRF(object):
self
.
srfs
[
bN
]
=
srf
/
np
.
trapz
(
x
=
wvl
,
y
=
srf
)
# TODO seems like we NEED nanometers here; BUT WHY??
self
.
srfs_wvl
=
np
.
array
(
wvl
)
self
.
bands
=
sorted
(
list
(
self
.
srfs
.
keys
()))
self
.
bands
=
list
(
srf_dict
.
keys
())
# = OrderedDict -> order follows LayerBandsAssignment
# FIXME this is not the GMS algorithm to calculate center wavelengths
# calculate center wavelengths
...
...
tests/test_spechomo_classifier.py
View file @
2736d732
...
...
@@ -11,10 +11,12 @@ Tests for gms_preprocessing.algorithms.L2B_P.SpecHomo_Classifier
import
unittest
import
os
import
numpy
as
np
from
geoarray
import
GeoArray
from
gms_preprocessing
import
__path__
# noqa E402 module level import not at top of file
from
gms_preprocessing
import
set_config
# noqa E402 module level import not at top of file
from
gms_preprocessing.algorithms.L2B_P
import
SpecHomo_Classifier
# noqa E402 module level import not at top of file
from
gms_preprocessing.algorithms.L2B_P
import
SpecHomo_Classifier2
# noqa E402 module level import not at top of file
testdata
=
os
.
path
.
join
(
__path__
[
0
],
'../tests/data/hy_spec_data/Bavaria_farmland_LMU_Hyspex_subset.bsq'
)
...
...
@@ -53,3 +55,71 @@ class Test_SpecHomo_Classifier(unittest.TestCase):
self
.
assertTrue
(
np
.
any
(
ref_cube_sp
),
msg
=
'Singleprocessing result is empty.'
)
self
.
assertTrue
(
np
.
any
(
ref_cube_mp
),
msg
=
'Multiprocessing result is empty.'
)
class
Test_SpecHomo_Classifier2
(
unittest
.
TestCase
):
"""Tests class for gms_preprocessing.algorithms.L2B_P.SpecHomo_Classifier"""
@
classmethod
def
setUpClass
(
cls
):
# Testjob Landsat-8
set_config
(
exec_mode
=
'Python'
,
job_ID
=
26186196
,
db_host
=
'geoms'
,
reset_status
=
True
,
is_test
=
True
)
cls
.
SHC
=
SpecHomo_Classifier2
([
testdata
,
testdata
,
],
v
=
False
)
def
test__classify
(
self
):
classification
=
self
.
SHC
.
_classify
(
self
.
SHC
.
ims_ref
[
0
],
n_clusters
=
10
,
progress
=
True
)
self
.
assertIsInstance
(
classification
,
np
.
ndarray
)
self
.
assertEqual
(
classification
.
ndim
,
2
)
def
test_cluster_image_and_get_uniform_samples
(
self
):
src_im
=
self
.
SHC
.
ims_ref
[
0
]
unif_random_spectra
=
self
.
SHC
.
cluster_image_and_get_uniform_spectra
(
src_im
,
n_clusters
=
10
,
tgt_n_samples
=
1000
)
self
.
assertIsInstance
(
unif_random_spectra
,
np
.
ndarray
)
self
.
assertTrue
(
unif_random_spectra
.
shape
==
(
1000
,
GeoArray
(
src_im
).
bands
))
def
test_resample_spectra
(
self
):
src_im
=
GeoArray
(
self
.
SHC
.
ims_ref
[
0
])
unif_random_spectra
=
self
.
SHC
.
cluster_image_and_get_uniform_spectra
(
src_im
,
n_clusters
=
10
,
tgt_n_samples
=
1000
)
from
gms_preprocessing.io.input_reader
import
SRF
tgt_srf
=
SRF
(
dict
(
Satellite
=
'Sentinel-2A'
,
Sensor
=
'MSI'
,
Subsystem
=
None
,
image_type
=
'RSD'
,
proc_level
=
'L1A'
,
logger
=
None
))
unif_random_spectra_rsp
=
\
self
.
SHC
.
resample_spectra
(
unif_random_spectra
,
src_cwl
=
np
.
array
(
src_im
.
meta
.
loc
[
'wavelength'
],
dtype
=
np
.
float
).
flatten
(),
tgt_srf
=
tgt_srf
)
self
.
assertIsInstance
(
unif_random_spectra_rsp
,
np
.
ndarray
)
self
.
assertEqual
(
unif_random_spectra_rsp
.
shape
,
(
1000
,
len
(
tgt_srf
.
bands
)))
pass
def
test_generate_reference_cube
(
self
):
refcubes
=
self
.
SHC
.
generate_reference_cubes
()
pass
@
unittest
.
SkipTest
def
test_generate_reference_cube_L8
(
self
):
ref_cube
=
self
.
SHC
.
generate_reference_cubes
(
'Landsat-8'
,
'OLI_TIRS'
,
n_clusters
=
10
,
tgt_n_samples
=
1000
)
self
.
assertIsInstance
(
ref_cube
,
np
.
ndarray
)
self
.
assertTrue
(
np
.
any
(
ref_cube
),
msg
=
'Reference cube for Landsat-8 is empty.'
)
@
unittest
.
SkipTest
def
test_generate_reference_cube_S2A
(
self
):
ref_cube
=
self
.
SHC
.
generate_reference_cubes
(
'Sentinel-2A'
,
'MSI'
,
n_clusters
=
10
,
tgt_n_samples
=
1000
)
self
.
assertIsInstance
(
ref_cube
,
np
.
ndarray
)
self
.
assertTrue
(
np
.
any
(
ref_cube
),
msg
=
'Reference cube for Sentinel-2A is empty.'
)
@
unittest
.
SkipTest
def
test_generate_reference_cube_AST
(
self
):
ref_cube
=
self
.
SHC
.
generate_reference_cubes
(
'Terra'
,
'ASTER'
,
n_clusters
=
10
,
tgt_n_samples
=
1000
)
self
.
assertIsInstance
(
ref_cube
,
np
.
ndarray
)
@
unittest
.
SkipTest
def
test_multiprocessing
(
self
):
SHC
=
SpecHomo_Classifier
([
testdata
,
testdata
,
],
v
=
False
,
CPUs
=
1
)
ref_cube_sp
=
SHC
.
generate_reference_cube
(
'Landsat-8'
,
'OLI_TIRS'
,
n_clusters
=
10
,
tgt_n_samples
=
1000
)
SHC
=
SpecHomo_Classifier
([
testdata
,
testdata
,
],
v
=
False
,
CPUs
=
None
)
ref_cube_mp
=
SHC
.
generate_reference_cube
(
'Landsat-8'
,
'OLI_TIRS'
,
n_clusters
=
10
,
tgt_n_samples
=
1000
)
self
.
assertTrue
(
np
.
any
(
ref_cube_sp
),
msg
=
'Singleprocessing result is empty.'
)
self
.
assertTrue
(
np
.
any
(
ref_cube_mp
),
msg
=
'Multiprocessing result is empty.'
)
Daniel Scheffler
@danschef
mentioned in commit
4456e245
·
Feb 06, 2018
mentioned in commit
4456e245
mentioned in commit 4456e2451bb8591f1b0c65608b3d8e19e164293c
Toggle commit list
Daniel Scheffler
@danschef
mentioned in commit
0de6e8d4
·
Mar 31, 2020
mentioned in commit
0de6e8d4
mentioned in commit 0de6e8d4edbb83e1d485b645a130aed20b6d8753
Toggle commit list
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment