Commit 9f5dddc2 authored by Daniel Scheffler's avatar Daniel Scheffler
Browse files

Revised compression.decompress.decompress.

parent f6e11a2c
Pipeline #1684 passed with stages
in 1 minute and 5 seconds
......@@ -3,22 +3,31 @@
import os
import zipfile
import tarfile
from logging import Logger
import gzip
from logging import getLogger
import shutil
__author__ = 'Daniel Scheffler'
def decompress(compressed_file, outputpath=None, logger=Logger('decompressor')):
"""Decompresses ZIP, TAR, TAR.GZ and TGZ archives to a given output path.
def decompress(compressed_file, outputpath=None, logger=getLogger('decompressor')):
"""Decompresses ZIP, TAR, TAR.GZ, TGZ and GZ archives to a given output path.
:param compressed_file:
:param outputpath:
:param logger: instance of logging.Logger
"""
filepath, filename = os.path.split(compressed_file)
logger.info('Extracting ' + filename + '...')
outputpath = outputpath or os.path.join(filepath, filename.partition(".")[0])
if not os.path.exists(outputpath):
os.makedirs(outputpath)
# define output folder and filename
in_folder, in_filename = os.path.split(compressed_file)
out_folder, out_filename = os.path.split(outputpath) if outputpath else ('', '')
out_filename = out_filename or in_filename.partition(".")[0]
out_folder = out_folder or in_folder
outputpath = os.path.join(out_folder, out_filename)
# decompress
logger.info('Extracting ' + in_filename + '...')
if not os.path.isdir(out_folder):
os.makedirs(out_folder)
if compressed_file.endswith(".zip"):
assert zipfile.is_zipfile(compressed_file), \
......@@ -30,7 +39,7 @@ def decompress(compressed_file, outputpath=None, logger=Logger('decompressor')):
if os.path.exists(os.path.join(outputpath, n)) and \
zipfile.ZipFile.getinfo(zf, n).file_size == os.stat(os.path.join(outputpath, n)).st_size:
logger.warning("file '%s' from '%s' already exists in the directory: '%s'"
% (n, filename, outputpath))
% (n, in_filename, outputpath))
else:
written = 0
while written == 0:
......@@ -45,12 +54,12 @@ def decompress(compressed_file, outputpath=None, logger=Logger('decompressor')):
else:
raise
if count_extracted == 0:
logger.warning("No files of %s have been decompressed.\n" % filename)
logger.warning("No files of %s have been decompressed.\n" % in_filename)
else:
logger.info("Extraction of '" + filename + " was successful\n")
logger.info("Extraction of '" + in_filename + " was successful\n")
zf.close()
elif compressed_file.endswith(".tar") or compressed_file.endswith(".tar.gz") or compressed_file.endswith(".tgz"):
elif compressed_file.endswith((".tar", ".tar.gz", ".tgz")):
tf = tarfile.open(compressed_file)
names, members = tf.getnames(), tf.getmembers()
count_extracted = 0
......@@ -58,7 +67,7 @@ def decompress(compressed_file, outputpath=None, logger=Logger('decompressor')):
if os.path.exists(os.path.join(outputpath, n)) and \
m.size == os.stat(os.path.join(outputpath, n)).st_size:
logger.warning("file '%s' from '%s' already exists in the directory: '%s'"
% (n, filename, outputpath))
% (n, in_filename, outputpath))
else:
written = 0
while written == 0:
......@@ -73,7 +82,16 @@ def decompress(compressed_file, outputpath=None, logger=Logger('decompressor')):
else:
raise
if count_extracted == 0:
logger.warning("No files of %s have been decompressed.\n" % filename)
logger.warning("No files of %s have been decompressed.\n" % in_filename)
else:
logger.info("Extraction of '" + filename + " was successful\n")
logger.info("Extraction of '" + in_filename + " was successful\n")
tf.close()
elif compressed_file.endswith(".gz"):
with gzip.open(compressed_file, 'rb') as f_in:
with open(outputpath, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
else:
raise ValueError('Unexpected file extension of compressed file. Supported file extensions are: '
'*.zip, *.tar and *.tgz')
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment