Commit b0be70b7 authored by Michael Rudolf's avatar Michael Rudolf
Browse files

Finish Restructuring.

- Transferred most code from `RST_pick_GUI` into separate files.
- Fixed a minor bug in the mutual linear regression.
- Instead of a `Var`dictionary we now use the full configparser.
- Moved nested functions into dedicated sections to make things easier.
- If TDMS files are analysed, they are now first checked for size and downsampled if necessary.
- Added a few test scripts for testing code.
parent a811c580
This diff is collapsed.
import numpy as np
import rstevaluation.tools.analysis as rstanalysis
def main():
x = np.linspace(0, 100, 10**3)
y = 3 * x + 300
rstanalysis.rst_analmut(x, y)
if __name__ == '__main__':
import timeit
print(
timeit.timeit(
'main()',
setup='from __main__ import main',
number=10
)
)
import nptdms
import numpy as np
"""
Timings for 18 repetitions (typical file number)
current = 18 sec
new_append = 23 sec
new_preall = 23 sec
new_chunked = 29 sec
"""
def main():
# tdms_file_path = '/home/mrudolf/Nextcloud/GitRepos/rst-evaluation/rstevaluation/input/03_example.tdms'
tdms_file_path = '/media/mrudolf/Data/RST-Data/491-01_Glassbeads_200_300_Prague/491-01_10_Glassbeads_200_300_Prague [f=1.00kHz] [2021-04-30_150714].tdms'
new_chunked(tdms_file_path)
def new_chunked(tdms_file_path):
with nptdms.TdmsFile.open(tdms_file_path) as tdms_file:
old_time = tdms_file['Untitled']['Shear Force'].time_track()
dt = old_time[1]
shearforce = []
normalforce = []
liddispl = []
velocity = []
time = []
for chunk in tdms_file.data_chunks():
chunk_len = len(chunk['Untitled']['Shear Force'])
shearforce.append(np.mean(chunk['Untitled']['Shear Force']))
normalforce.append(np.mean(chunk['Untitled']['Normal Force']))
liddispl.append(np.mean(chunk['Untitled']['Lid Displacement']))
velocity.append(np.mean(chunk['Untitled']['Velocity']))
time.append(chunk_len*dt)
time -= time[0]
def new_preall(tdms_file_path):
with nptdms.TdmsFile.open(tdms_file_path) as tdms_file:
old_time = tdms_file['Untitled']['Shear Force'].time_track()
freq = 1/old_time[1]
R = int(freq / 5)
old_len = len(tdms_file['Untitled']['Shear Force'])
new_len = int(old_len/R)
shearforce = np.zeros(new_len)
normalforce = np.zeros(new_len)
liddispl = np.zeros(new_len)
velocity = np.zeros(new_len)
ii = 0
shr_chk = []
nrm_chk = []
lid_chk = []
vel_chk = []
for chunk in tdms_file.data_chunks():
shr_chk.extend(chunk['Untitled']['Shear Force'][:])
nrm_chk.extend(chunk['Untitled']['Normal Force'][:])
lid_chk.extend(chunk['Untitled']['Lid Displacement'][:])
vel_chk.extend(chunk['Untitled']['Velocity'][:])
# if we reach enough points we calculate the mean and append to
# new channel
if len(shr_chk) > R:
shearforce[ii] = np.mean(shr_chk[:R])
shr_chk = shr_chk[R:]
normalforce[ii] = np.mean(nrm_chk[:R])
nrm_chk = nrm_chk[R:]
liddispl[ii] = np.mean(lid_chk[:R])
lid_chk = lid_chk[R:]
velocity[ii] = np.mean(vel_chk[:R])
vel_chk = vel_chk[R:]
ii += 1
time = np.linspace(0, old_time[-1], len(velocity))
def new_append(tdms_file_path):
with nptdms.TdmsFile.open(tdms_file_path) as tdms_file:
old_time = tdms_file['Untitled']['Shear Force'].time_track()
freq = 1/old_time[1]
R = int(freq / 5)
shearforce = []
normalforce = []
liddispl = []
velocity = []
ii = 0
shr_chk = []
nrm_chk = []
lid_chk = []
vel_chk = []
for chunk in tdms_file.data_chunks():
shr_chk.extend(chunk['Untitled']['Shear Force'][:])
nrm_chk.extend(chunk['Untitled']['Normal Force'][:])
lid_chk.extend(chunk['Untitled']['Lid Displacement'][:])
vel_chk.extend(chunk['Untitled']['Velocity'][:])
# if we reach enough points we calculate the mean and append to
# new channel
if len(shr_chk) > R:
shearforce.append(np.mean(shr_chk[:R]))
shr_chk = shr_chk[R:]
normalforce.append(np.mean(nrm_chk[:R]))
nrm_chk = nrm_chk[R:]
liddispl.append(np.mean(lid_chk[:R]))
lid_chk = lid_chk[R:]
velocity.append(np.mean(vel_chk[:R]))
vel_chk = vel_chk[R:]
time = np.linspace(0, old_time[-1], len(velocity))
def old(tdms_file_path):
with nptdms.TdmsFile.open(tdms_file_path) as tdms_file:
time = tdms_file['Untitled']['Shear Force'].time_track() # in [s]
shearforce = tdms_file['Untitled']['Shear Force'][:] # in [N]
normalforce = tdms_file['Untitled']['Normal Force'][:] # in [N]
liddispl = tdms_file['Untitled']['Lid Displacement'][:] # in [mm]
velocity = tdms_file['Untitled']['Velocity'][:] # in [mm/s]
if __name__ == '__main__':
import timeit
print(
timeit.timeit(
'main()',
setup='from __main__ import main',
number=18
)
)
......@@ -12,9 +12,13 @@ def rst_analmut(x, y):
# n = len(x)
ys = [y[k] for k in range(len(x)) for j in range(len(x)-k)]
xs = [x[k] for k in range(len(x)) for j in range(len(x)-k)]
M = np.array([(y[k+j]-y[k])/(x[k+j]-x[k])
for k in range(len(x)) for j in range(len(x)-k)])
C = np.array([(y-m*x) for y, m, x in zip(ys, xs, M)])
M = np.array(
[
(y[k+j]-y[k])/(x[k+j]-x[k])
for k in range(len(x)) for j in range(len(x)-k)
]
)
C = np.array([(y-m*x) for y, m, x in zip(ys, M, xs)])
M = M[np.nonzero(np.isfinite(M))]
C = C[np.nonzero(np.isfinite(C))]
......
""" Functions for data handling such as filtering, downsampling and fitting """
import os
import shutil
import nptdms
import numpy as np
import rstevaluation.tools.files as rstfiles
from tqdm import tqdm
def downsample(data, R):
def downsample_data(data, R):
"""
Downsamples data by factor R using the mean over R values.
Pads the data with NaN at the edges if not divisible by R.
Pads the data with NaN at the edges if not divisible by R so that the last
value uses less data points for the mean.
"""
pad_size = int(np.ceil(float(data.size)/R)*R - data.size)
data_padded = np.append(data, np.ones(pad_size)*np.NaN)
new_data = np.nanmean(data_padded.reshape(-1, R), axis=1)
return new_data
def downsample_file(file_path, freq=5):
"""
Opens a tdms file and saves it in a downsampled version.
Keywords:
- freq: Frequency to downsample to (default=5Hz)
"""
new_file_path = file_path.replace(
'.tdms', '_downsampled.tdms'
)
with nptdms.TdmsFile(file_path) as tdms_file, \
nptdms.TdmsWriter(new_file_path) as new_file:
root_object = nptdms.RootObject(tdms_file.properties)
original_groups = tdms_file.groups()
channels = [
downsample_channel(chan, freq=freq)
for group in original_groups
for chan in group.channels()
]
new_file.write_segment([root_object] + original_groups + channels)
return new_file_path
def downsample_channel(channel, freq):
""" Downsamples a channel to given frequency """
old_freq = 1/channel.time_track()[1]
R = int(old_freq / freq)
props = channel.properties
props['wf_increment'] *= R
downsampled = nptdms.ChannelObject(
group=channel.group_name,
channel=channel.name,
data=downsample_data(channel.data, R),
properties=props
)
return downsampled
def check_tdms(file_list, config):
"""
Checks if a tdms file on the list is longer than allowed. If yes then the
data is automatically downsampled.
"""
max_len = 0
for file_name in tqdm(file_list, desc='Checking TDMS file length'):
cur_len = rstfiles.get_length(
os.path.join(config['paths']['path_in'], file_name)
)
if cur_len > max_len:
max_len = cur_len
if max_len > config.getint('parameters', 'nsamples'):
print(
'Converting folder. Raw data files will be moved to raw_data.'
)
raw_folder = os.path.join(config['paths']['path_in'], 'raw_data')
os.makedirs(
raw_folder,
exist_ok=True
)
for file_name in tqdm(file_list, desc='Downsampling Files'):
file_path = os.path.join(config['paths']['path_in'], file_name)
new_path = os.path.join(raw_folder, file_name)
shutil.move(file_path, new_path)
new_file_path = downsample_file(new_path)
shutil.move(new_file_path, file_path)
......@@ -2,7 +2,9 @@
import codecs
import collections
import configparser
import csv
import json
import operator
import os
......@@ -11,16 +13,21 @@ import numpy as np
import rstevaluation.tools.data as rstdat
def convert(path, file_in, var):
def convert(path, file_in, config):
"""Reads data from source files and returns a dictionary with data"""
if isinstance(config, configparser.ConfigParser):
var = {
k: json.loads(config['parameters'][k])
for k in config['parameters']
}
# possible inputs and linked helper functions
file_convert = {'.asc': readasc,
'.dat': readdat,
'.tdms': readtdms}
(_, ext) = os.path.splitext(os.path.join(path, file_in))
data = file_convert[ext](path, file_in) # data contains basic content
data = file_convert[ext](path, file_in, config)
# check if stress data was imported instead of force
if 'normalstress' in data.keys():
......@@ -54,7 +61,7 @@ def convert(path, file_in, var):
return data
def readasc(path, file_in):
def readasc(path, file_in, config):
""" Helper function to read *.asc file """
with codecs.open(os.path.join(path, file_in), encoding='utf-8-sig') as f:
data_load = np.loadtxt(f) # load file for further calculations
......@@ -65,16 +72,18 @@ def readasc(path, file_in):
velocity = data_load[:, 4] / 60 # mm/min -> mm/s
liddispl = data_load[:, 3] # in mm
normalforce = data_load[:, 1] * 9.81 # kg -> N
data = {'time': time,
data = {
'time': time,
'velocity': velocity,
'normalforce': normalforce,
'shearforce': shearforce,
'liddispl': liddispl,
'name': file_in.split('.')[0]}
'name': file_in.split('.')[0]
}
return data
def readdat(path, file_in):
def readdat(path, file_in, config):
""" Helper function to read *.dat file """
load_fun = [
alternativeload, standardload, pascalload
......@@ -94,7 +103,7 @@ def readdat(path, file_in):
return data
def readtdms(path, file_in):
def readtdms(path, file_in, config):
""" Helper function to read *.tdms file """
if int(nptdms.__version__[0]) < 1:
# Deprecated file interface
......@@ -119,12 +128,14 @@ def readtdms(path, file_in):
liddispl = f['Untitled']['Lid Displacement'][:] # in [mm]
velocity = f['Untitled']['Velocity'][:] # in [mm/s]
data = {'time': time,
data = {
'time': time,
'velocity': velocity,
'normalforce': normalforce,
'shearforce': shearforce,
'liddispl': liddispl,
'name': file_in.split('.')[0]}
'name': file_in.split('.')[0]
}
return data
......@@ -379,3 +390,10 @@ def savelidpos(path, name, p_ind, exp_data):
w.writerow(header)
w.writerows(rows)
return rows
def get_length(file_path):
""" Gives back sample length of tdms data """
with nptdms.TdmsFile.open(file_path) as tdms_file:
length = len(tdms_file['Untitled']['Shear Force'].time_track())
return length
""" Contains class for main graphical user interface. """
import configparser
import json
import os
import pathlib
import re
import sys
import tkinter as tk
from tkinter import filedialog, messagebox
import rstevaluation.tools.interface.processing as rstprocess
class RST_pick_GUI(tk.Tk):
"""Main Window for the RST Picking"""
def __init__(self, parent):
tk.Tk.__init__(self, parent)
# == VARIABLES
self.parent = parent
self.projectname = tk.StringVar()
self.path_in = tk.StringVar()
self.path_out = tk.StringVar()
self.path_cfg = tk.StringVar()
self.proc = tk.StringVar()
self.plot_ts = tk.IntVar()
self.save_ts = tk.IntVar()
self.rev_pick = tk.IntVar()
self.is_VST = tk.IntVar()
self.pred_norm = tk.IntVar()
self.rst = tk.IntVar()
self.cfg = configparser.ConfigParser()
self.cfg.optionxform = str
self.path_cfg.set('rst_evaluation_')
self.params = dict()
self.protocol('WM_DELETE_WINDOW', self.closing_menu)
# Initialize Object
self.initalize()
def initalize(self):
# Read or create config and set entries from config
self.refresh_entries_from_cfg(init=True)
# == WIDGETS and INTERFACE ==
self.grid()
# Project name
self.label_projname = tk.Label(
self,
text='Project:',
anchor='e',
width=20
)
self.label_projname.grid(column=0, row=0, sticky='EW')
self.entry_projname = tk.Entry(
self,
textvariable=self.projectname)
self.entry_projname.grid(column=1, row=0, sticky='EW')
# Input path
self.label_input = tk.Label(
self,
text='Input Folder:',
anchor='e')
self.label_input.grid(column=0, row=1, sticky='EW')
self.entry_input = tk.Entry(
self,
width=80,
textvariable=self.path_in)
self.entry_input.grid(column=1, row=1, sticky='EW')
self.button_input = tk.Button(
self,
text='...')
self.button_input.grid(column=2, row=1)
self.button_input.bind('<ButtonRelease-1>', self.folder_browser)
# Output path
self.label_output = tk.Label(
self,
text='Output Folder:',
anchor='e')
self.label_output.grid(column=0, row=2, sticky='EW')
self.entry_output = tk.Entry(
self,
textvariable=self.path_out)
self.entry_output.grid(column=1, row=2, sticky='EW')
self.button_output = tk.Button(
self,
text='...')
self.button_output.grid(column=2, row=2)
self.button_output.bind('<ButtonRelease-1>', self.folder_browser)
# Options
self.label_options = tk.Label(
self,
text='Options:',
anchor='w',
font=('Helvetica', 12, 'bold'))
self.label_options.grid(column=0, row=3, columnspan=3, sticky='EW')
# Pick Data
self.opt_pick = tk.Checkbutton(
self,
text='Automatic Picking',
anchor='w',
variable=self.rst)
self.opt_pick.grid(column=0, row=4, sticky='EW')
# Save Data
self.opt_save = tk.Checkbutton(
self,
text='Save Data',
anchor='w',
variable=self.save_ts)
self.opt_save.grid(column=0, row=5, sticky='EW')
# Plot Data
self.opt_plot = tk.Checkbutton(
self,
text='Plot time series',
anchor='w',
variable=self.plot_ts)
self.opt_plot.grid(column=0, row=6, sticky='EW')
# Review Picking
self.opt_revpick = tk.Checkbutton(
self,
text='Review picking during run',
anchor='w',
variable=self.rev_pick)
self.opt_revpick.grid(column=1, row=4, sticky='EW')
# Use predefined normal stress
self.opt_pred = tk.Checkbutton(
self,
text='Use predefined normal stress',
anchor='w',
variable=self.pred_norm)
self.opt_pred.grid(column=1, row=5, sticky='EW')
# Is a VST-Test
self.opt_revpick = tk.Checkbutton(
self,
text='VST-Test',
anchor='w',
variable=self.is_VST)
self.opt_revpick.grid(column=1, row=6, sticky='EW')
# More Options
self.button_options = tk.Button(
self,
text='More options...')
self.button_options.grid(column=1, row=7, sticky='w')
self.button_options.bind('<ButtonRelease-1>', self.more_options)
# Start processing
self.button_start = tk.Button(
self,
text='Start Processing',
padx=20)
self.button_start.grid(column=0, row=7, columnspan=3, sticky='w')
self.button_start.bind('<ButtonRelease-1>', self.start_processing)
self.make_dpi_aware()
def get_HWND_dpi(self):
"""Detects high dpi displays and rescales gui in Windows
Adapted from the user 'dingles at stack-overflow"""
if os.name == "nt":
from ctypes import pointer, windll, wintypes
try:
windll.shcore.SetProcessDpiAwareness(1)
except Exception:
print('High-DPI scaling failed')
pass # this will fail on Win Server and maybe early Windows
DPI100pc = 96 # DPI 96 is 100% scaling
DPI_type = 0
# MDT_EFFECTIVE_DPI = 0, MDT_ANGULAR_DPI = 1, MDT_RAW_DPI = 2
winH = wintypes.HWND(self.winfo_id())
monitorhandle = windll.user32.MonitorFromWindow(winH,
wintypes.DWORD(2))
# MONITOR_DEFAULTTONEAREST = 2
X = wintypes.UINT()
Y = wintypes.UINT()
try:
windll.shcore.GetDpiForMonitor(monitorhandle,
DPI_type,
pointer(X),
pointer(Y))
return X.value, Y.value, (X.value + Y.value) / (2 * DPI100pc)
except Exception:
print('Assuming standard dpi scaling.')
return 96, 96, 1 # Assume standard Windows DPI & scaling
else:
return None, None, 1 # What to do for other OSs?
def make_dpi_aware(self):
"""Adapted from the user 'dingles at stack-overflow"""
def TkGeometryScale(s, cvtfunc):
# format "WxH+X+Y"
patt = r"(?P<W>\d+)x(?P<H>\d+)\+(?P<X>\d+)\+(?P<Y>\d+)"
R = re.compile(patt).search(s)
G = str(cvtfunc(R.group("W"))) + "x"
G += str(cvtfunc(R.group("H"))) + "+"
G += str(cvtfunc(R.group("X"))) + "+"
G += str(cvtfunc(R.group("Y")))
return G
self.DPI_X, self.DPI_Y, self.DPI_scaling = self.get_HWND_dpi()
self.TkScale = lambda v: int(float(v) * self.DPI_scaling)
self.TkGeometryScale = lambda s: TkGeometryScale(s, self.TkScale)
def resize_all_elements(self):
"""
Resizes all elements according to the DPI scaling of the window
(WIP)
-----
"""
pass
def plot_window(self, event):
"""Shows a plot window"""
plot_win = tk.Toplevel(self)
plot_win.wm_title('Plot Window')