Commit 8830a539 authored by Michael Rudolf's avatar Michael Rudolf
Browse files

Packaging Phase 2

- Missing imports and other problems were resolved.
- Software should now work again as intended.
parent e2c1f53c
......@@ -8,9 +8,11 @@ Examples/
*.tdms
*.h5
*.asc
*.txt
# Ignore config files
*.ini
*.cfg
# ignore pictures and pdfs
*.pdf
......
This diff is collapsed.
......@@ -22,7 +22,7 @@ import sys
import tkinter as tk
import warnings
from pathlib import Path
from tkinter import filedialog, messagebox
from tkinter import filedialog
import rstevaluation.RST_Func as rfnc
......@@ -248,7 +248,7 @@ class RST_pick_GUI(tk.Tk):
for item in self.params:
try:
Vars[item] = float(self.params[item].get())
except ValueError as _:
except ValueError:
Vars[item] = json.loads(self.params[item].get())
self.cfg['parameters'][item] = self.params[item].get()
self.cfg['options']['plot_ts'] = str(self.plot_ts.get())
......
""" Advanced data analysis functions for ringshear tester data """
import numpy as np
import scipy.optimize as spopt
# =======================MUTUAL LINEAR REGRESSION==============================
def rst_analmut(x, y):
"""
calculation of friction coefficients (slope, M) and y-axis intercept
(cohesion, C) by mutual two point linear regressions:
"""
# n = len(x)
ys = [y[k] for k in range(len(x)) for j in range(len(x)-k)]
xs = [x[k] for k in range(len(x)) for j in range(len(x)-k)]
M = np.array([(y[k+j]-y[k])/(x[k+j]-x[k])
for k in range(len(x)) for j in range(len(x)-k)])
C = np.array([(y-m*x) for y, m, x in zip(ys, xs, M)])
M = M[np.nonzero(np.isfinite(M))]
C = C[np.nonzero(np.isfinite(C))]
M_avg = np.mean(M)
M_std = np.std(M)
C_avg = np.mean(C)
C_std = np.std(C)
fric_mut = (M_avg, M_std, C_avg, C_std)
data_mut = (M, C)
return fric_mut, data_mut
# %%=======================STANDARD LINEAR REGRESSION==========================
def rst_analstd_old(x, y):
""" Correlation of normal and shear stress: y = slope * x+intercept """
x = np.array(x)
y = np.array(y)
n = len(x)
P1 = n*np.sum(x*y)-np.sum(x)*np.sum(y)
P2 = n*np.sum(x**2)-np.sum(x)**2
slope = P1/P2 # coefficient of friction
intercept = (np.sum(x**2)*np.sum(y)-np.sum(x)*np.sum(x*y))/P2 # cohesion
diff = y-intercept-slope*x
s = np.sum(diff**2)/(n-2)
std_slope = np.sqrt(n*s/P2) # standard error Coef. friction
std_intercept = np.sqrt(np.sum(x**2)*s/P2) # standard error cohesion
y_fit = np.polyval([slope, intercept], x)
out_var = (slope, std_slope, intercept, std_intercept, y_fit)
return out_var
def rst_analstd(x, y):
"""
Fitting shear stress with linear spopt.curve_fitting. Errors estimated
using 100 randomized sets of data.
"""
x = np.array(x)
y = np.array(y)
pfit, perr = fit_bootstrap(x, y, fitfric)
y_fit = fitfric(x, *pfit)
return (pfit[0], perr[0], pfit[1], perr[1], y_fit)
def fitfric(x, a, b):
return a*x+b
# %%=====================VELOCITY STEPPING ANALYSIS============================
def vst_analysis(data):
mu_app = data['shearstress']/data['normalstress'] # Apparent friction
logvel = np.log10(data['velocity'])
# Remove nonfinite values for fitting
onlyfinite = np.isfinite(mu_app)
mu_app = mu_app[onlyfinite]
logvel = logvel[onlyfinite]
onlyfinite = np.isfinite(logvel)
mu_app = mu_app[onlyfinite]
logvel = logvel[onlyfinite]
pfit, pcov = spopt.curve_fit(
fitfric,
logvel,
mu_app
)
perr = 2*np.sqrt(np.diag(pcov))
name_fit = (
r'y = ({:.4f}$\pm${:.5f})log10(x) +' +
'\n\t\t' +
r'({:.4f}$\pm${:.5f})').format(pfit[0],
perr[0],
pfit[1],
perr[1])
return pfit, perr, name_fit
# %%======================BOOTSTRAP LINEAR REGRESSION==========================
def fit_bootstrap(datax, datay, function=None,
yerr_systematic=0.0, nsigma=2, nsets=100):
"""
Does a bootstrap fit of datax and datay by fitting a number of nsets. Each
set contains normal distributed noise derived from the standard deviation
of the residuals.
You can choose the confidence interval that you want for your
parameter estimates:
1sigma corresponds to 68.3% confidence interval
2sigma corresponds to 95.44% confidence interval
"""
# Error function producing residuals
def errfunc(p, x, y):
return function(x, *p) - y
# Fit first time
pfit, _ = spopt.curve_fit(function, datax, datay)
# Get the stdev of the residuals
residuals = errfunc(pfit, datax, datay)
sigma_res = nsigma*np.std(residuals)
sigma_err_total = np.sqrt(sigma_res**2 + yerr_systematic**2)
# 100 random data sets are generated and fitted
ps = []
for _ in range(nsets):
randomDelta = np.random.normal(0., sigma_err_total, len(datay))
randomdataY = datay + randomDelta
randomfit, _ = spopt.curve_fit(function, datax, randomdataY)
ps.append(randomfit)
ps = np.array(ps)
mean_pfit = np.mean(ps, axis=0)
err_pfit = nsigma * np.std(ps, axis=0)
return mean_pfit, err_pfit
# def dilation(path, name, lid_pos, exp_data, prop_file, cfg):
# """ Analysis of dilation and density during experiment """
# def _getexpnum(exp):
# prefix = exp['name'].split(' ')[0]
# if '_' in prefix:
# number = int(prefix.split('_')[-1])
# else:
# number = int(prefix.split('-')[-1])
# return number
# def _getposnum(lpos):
# prefix = lpos[0].split(' ')[0]
# if '_' in prefix:
# number = int(prefix.split('_')[-1])
# else:
# number = int(prefix.split('-')[-1])
# return number
# props = _readpropfile(prop_file)
# num_props = np.array([int(d[0].split('-')[-1]) for d in props['data']])
# num_exp = np.array([_getexpnum(exp) for exp in exp_data])
# num_pos = np.array([_getposnum(lpos) for lpos in lid_pos])
# ref_mass = cfg.getfloat('parameters', 'cellweight')
# masses = np.array([float(d[2]) for d in props['data']]) - ref_mass
# normloads = np.array([float(d[1]) for d in props['data']])
# tardens = np.array([float(d[3]) for d in props['data']])
# area = cfg.getfloat('parameters', 'A')
# linecolor = ['royalblue', 'r', 'g', 'k']
# marker = ['o', '^', 's', 'h']
# label2 = ['Peak', 'Dynamic', 'Reactivation', 'Initial']
# fig, (ax1, ax2) = plt.subplots(ncols=2,
# sharey=True,
# figsize=(10, 5))
# densities = []
# for (i, (mass, norm, tden)) in
# enumerate(zip(masses, normloads, tardens))
# :
# n_p = num_props[i]
# n_x = np.nonzero(num_exp == n_p)[0][0]
# n_l = np.nonzero(num_pos == n_p)[0][0]
# exp = exp_data[n_x]
# ini_lid = exp['liddispl'][0]
# lid = lid_pos[n_l][1:]
# lid.append(ini_lid)
# for (j, l) in enumerate(lid):
# density = (mass/1000)/(area*(0.04-l/1000))
# densities.append(density)
# ax1.plot(norm, density,
# marker=marker[j],
# color=linecolor[j],
# label=label2[j] if i == 0 else '')
# ax2.plot(tden, density,
# marker=marker[j],
# color=linecolor[j])
# # all_dens = np.concatenate((densities, tardens))
# fig.legend()
# ax1.set_ylabel('Density during shearing (kg/m$^3$)')
# ax1.set_xlabel('Normal stress (Pa)')
# ax2.set_xlabel('Target density (kg/m$^3$')
# # ax2.set_xlim(
# # np.min(all_dens),
# # np.max(all_dens)
# # )
# # ax2.set_ylim(ax2.get_xlim())
# fig.savefig(path+name+'_sheardensity.png')
# plt.close()
""" Functions for data handling such as filtering, downsampling and fitting """
import numpy as np
def downsample(data, R):
"""
Downsamples data by factor R using the mean over R values.
Pads the data with NaN at the edges if not divisible by R.
"""
pad_size = int(np.ceil(float(data.size)/R)*R - data.size)
data_padded = np.append(data, np.ones(pad_size)*np.NaN)
new_data = np.nanmean(data_padded.reshape(-1, R), axis=1)
return new_data
""" Functions for file operations for rst-evaluation """
import codecs
import collections
import csv
import operator
import os
import nptdms
import numpy as np
import rstevaluation.tools.data as rstdat
def convert(path, file_in, var):
"""Reads data from source files and returns a dictionary with data"""
# possible inputs and linked helper functions
file_convert = {'.asc': readasc,
'.dat': readdat,
'.tdms': readtdms}
(_, ext) = os.path.splitext(os.path.join(path, file_in))
data = file_convert[ext](path, file_in) # data contains basic content
# check if stress data was imported instead of force
if 'normalstress' in data.keys():
data['normalforce'] = data['normalstress'] * var['A']
data['shearforce'] = (
(data['shearstress'] * (var['li']*var['A'])) / var['lo']
)
nsamples = var['nsamples']
if len(data['time']) > nsamples:
freq = 1/data['time'][1]
R = int(freq / 5)
data['velocity'] = rstdat.downsample(data['velocity'], R)
data['shearforce'] = rstdat.downsample(data['shearforce'], R)
data['normalforce'] = rstdat.downsample(data['normalforce'], R)
data['time'] = np.linspace(0, data['time'][-1], len(data['velocity']))
print('Data to large... Downsampled to 5 Hz.')
# Corrections
data['time'] = data['time']-data['time'][0]
# data['liddispl'] = -(data['liddispl']-data['liddispl'][0])
data['velocity'][data['velocity'] < 0] = 0
# Additional data
data['shearstress'] = (data['shearforce']*var['lo'])/(var['li']*var['A'])
data['normalstress'] = data['normalforce']/var['A']
data['displacement'] = np.cumsum(data['time'][1]*data['velocity'])
data['name'] = file_in
print(file_in+' read')
return data
def readasc(path, file_in):
""" Helper function to read *.asc file """
with codecs.open(os.path.join(path, file_in), encoding='utf-8-sig') as f:
data_load = np.loadtxt(f) # load file for further calculations
# extract data and convert to SI-units
time = data_load[:, 0] # in s
shearforce = data_load[:, 2] * 9.81 # kg -> N
velocity = data_load[:, 4] / 60 # mm/min -> mm/s
liddispl = data_load[:, 3] # in mm
normalforce = data_load[:, 1] * 9.81 # kg -> N
data = {'time': time,
'velocity': velocity,
'normalforce': normalforce,
'shearforce': shearforce,
'liddispl': liddispl,
'name': file_in.split('.')[0]}
return data
def readdat(path, file_in):
""" Helper function to read *.dat file """
load_fun = [
alternativeload, standardload, pascalload
]
filetype = check_for_type(os.path.join(path, file_in))
print('%s of type: %s' % (file_in, filetype))
if filetype: # if filetype is non zero:
with codecs.open(
os.path.join(path, file_in),
encoding='utf-8-sig'
) as f:
try:
data = load_fun[filetype](f, file_in)
except ValueError:
data = alternativeload(f, file_in)
return data
def readtdms(path, file_in):
""" Helper function to read *.tdms file """
if int(nptdms.__version__[0]) < 1:
# Deprecated file interface
DeprecationWarning(
'Your version of NPTDMS is out of date, consider updating it.'
)
f = nptdms.TdmsFile(os.path.join(path, file_in))
time = f.group_channels('Untitled')[0].time_track() # in [s]
df_raw = dict()
for channel in f.group_channels('Untitled'):
channel_name = channel.path.replace('\'', '').split('/')[-1]
df_raw[channel_name] = channel.data
shearforce = df_raw['Shear Force'] # in [N]
normalforce = df_raw['Normal Force'] # in [N]
liddispl = df_raw['Lid Displacement'] # in [mm]
velocity = df_raw['Velocity'] # in [mm/s]
else:
with nptdms.TdmsFile.open(os.path.join(path, file_in)) as f:
time = f['Untitled']['Shear Force'].time_track() # in [s]
shearforce = f['Untitled']['Shear Force'][:] # in [N]
normalforce = f['Untitled']['Normal Force'][:] # in [N]
liddispl = f['Untitled']['Lid Displacement'][:] # in [mm]
velocity = f['Untitled']['Velocity'][:] # in [mm/s]
data = {'time': time,
'velocity': velocity,
'normalforce': normalforce,
'shearforce': shearforce,
'liddispl': liddispl,
'name': file_in.split('.')[0]}
return data
def readpropfile(prop_file):
""" Reads the properties of the experiment from the csv file """
props = dict()
with open(prop_file, 'r') as f:
csv_reader = csv.reader(f)
props['names'] = next(csv_reader)
props['units'] = next(csv_reader)
props['data'] = [row for row in csv_reader]
return props
def standardload(f, file_in):
"""
Standard loading
"""
data_load = np.loadtxt(f, delimiter=';', skiprows=3)
time = data_load[:, 0] # in s
normalforce = data_load[:, 1] # in N
shearforce = data_load[:, 2] # in N
liddispl = data_load[:, 3] # in mm
velocity = data_load[:, 4] # in mm/s
data = {
'time': time,
'velocity': velocity,
'normalforce': normalforce,
'shearforce': shearforce,
'liddispl': liddispl,
'name': file_in.split('.')[0]
}
return data
def alternativeload(f, file_in):
"""
Loads data in a slightly different way to ensure backwards
compatability
"""
data_load = np.loadtxt(f, delimiter='\t', skiprows=1)
time = data_load[:, 0] # in s
velocity = data_load[:, 1] # in mm/s
normalforce = data_load[:, 2] # in N
shearforce = data_load[:, 3] # in N
data = {
'time': time,
'velocity': velocity,
'normalforce': normalforce,
'shearforce': shearforce,
'name': file_in.split('.')[0]
}
return data
def pascalload(f, file_in):
"""
Loads a file with Pascal as main units
"""
data_load = np.loadtxt(f, delimiter=';', skiprows=3)
time = data_load[:, 0] # in s
normalforce = data_load[:, 1] # in Pa
shearforce = data_load[:, 2] # in Pa
liddispl = data_load[:, 3] # in mm
velocity = data_load[:, 4] # in mm/s
data = {
'time': time,
'velocity': velocity,
'normalstress': normalforce,
'shearstress': shearforce,
'liddispl': liddispl,
'name': file_in.split('.')[0]
}
return data
def check_for_type(filepath):
"""
Checks what kind of dat file we have.
Returns:
0: if it is unkown
1: if data is in Newton
2: if data is in Pascal
"""
with open(filepath, 'rt') as fhandle:
for i, row in enumerate(csv.reader(fhandle)):
if i == 2:
row = row[0]
if '[N]' in row:
return 1
elif '[Pa]' in row:
return 2
else:
return 0
elif i > 2:
break
def saveTS(path, name, exp_data, normal_stress):
""" Saves time series in a txt file """
# create header
header = ['Time [s]']
for n in normal_stress:
header.append(n)
# position of longest measurement
data_lens = [len(ex['time']) for ex in exp_data]
pos_dfmax = np.argmax(data_lens)
max_len = np.max(data_lens)
time_max = exp_data[pos_dfmax]['time']
# create array to save to file
# ts_data = np.zeros((max_len, len(exp_data)+1))
ts_data = ['%.5f' % t for t in time_max]
for ex in exp_data:
pads = np.ones(max_len-len(ex['shearstress']))*np.nan
next = np.hstack((np.around(ex['shearstress'], 2), pads))
next_str = ['%.2f' % n for n in next]
ts_data = np.vstack((ts_data, next_str))
ts_data = ts_data.T
# create file and save it
with open(path+name+'_ts.txt', 'w+') as f:
csvout = csv.writer(f,
delimiter='\t',
lineterminator='\n')
csvout.writerows([header])
csvout.writerows(ts_data)
def saveStrength(path, name, strength):
""" Saves the picked datapoints. """
label = ['_peak', '_dynamic', '_reactivation']
for i in range(0, 3):
Stren = np.vstack((np.round(strength[0], 2),
np.round(strength[i+1], 2)))
header_stress = 'Normal stress [Pa]'+'\t'+'Shear strength [Pa]'
with open(path+name+label[i]+'.txt', "w") as f:
f.write(header_stress+'\n')
for row in Stren.T:
csv.writer(f,
delimiter='\t',
lineterminator='\n').writerow(row)
f.closed
def saveFric(path, name, fricmut, fricstd):
""" Saves the fit data. """
header = '# Parameter' + \
'\t' + \
'Coeff. of internal friction' + \
'\t' + \
'Std. deviation (Coeff.)' + \
'\t' + \
'Cohesion [Pa]' + \
'\t' + \
'Std deviation (Coh.) [Pa]'
with open(path+name+'_fricstd.txt', 'w') as f:
f.write(header+'\n')
f_string = ''
f_string += 'Peak friction:' + \
'\t' + \
str(np.around(fricstd[0][0], 4)) + \
'\t'+str(np.around(fricstd[0][1], 4)) + \
'\t'+str(np.around(fricstd[0][2], 4)) + \
'\t'+str(np.around(fricstd[0][3], 4)) + \
'\n'
f_string += 'Dynamic friction:' + \
'\t' + \
str(np.around(fricstd[1][0], 4)) + \
'\t' + \
str(np.around(fricstd[1][1], 4)) + \
'\t' + \
str(np.around(fricstd[1][2], 4)) + \
'\t' + \
str(np.around(fricstd[1][3], 4)) + \
'\n'
f_string += 'Reactivation friction:' + \
'\t' + \
str(np.around(fricstd[2][0], 4)) + \
'\t' + \
str(np.around(fricstd[2][1], 4)) + \
'\t' + \
str(np.around(fricstd[2][2], 4)) + \
'\t' + \
str(np.around(fricstd[2][3], 4)) + \
'\n'
f.write(f_string)
f.closed
with open(path+name+'_fricmut.txt', 'w') as f:
f.write(header+'\n')
f_string = ''
f_string += 'Peak friction:' + \
'\t' + \
str(np.around(fricmut[0][0], 4)) + \
'\t' + \
str(np.around(fricmut[0][1], 4)) + \
'\t' + \
str(np.around(fricmut[0][2], 4)) + \
'\t' + \
str(np.around(fricmut[0][3], 4)) + \
'\n'
f_string += 'Dynamic friction:' + \
'\t' + \
str(np.around(fricmut[1][0], 4)) + \
'\t' + \
str(np.around(fricmut[1][1], 4)) + \
'\t' + \
str(np.around(fricmut[1][2], 4)) + \
'\t' + \