Commit 8779bd4d authored by Sebastian Heimann's avatar Sebastian Heimann
Browse files

can now suspend and continue

parent 87dc7e94
# -*- coding: utf-8 -*-
import numpy as num
import logging, os, shutil, sys, glob, copy, math
from multiprocessing import Pool
import logging, os, shutil, sys, glob, copy, math, signal, errno
from tempfile import mkdtemp
from subprocess import Popen, PIPE
......@@ -13,6 +9,7 @@ from guts import *
from guts_array import *
from pyrocko import trace, util, cake
from pyrocko import gf
from pyrocko.parimap import parimap
Timing = gf.meta.Timing
......@@ -339,8 +336,14 @@ class QSSPConfigFull(QSSPConfig):
return template % d
class QSSPError(Exception):
class QSSPError(gf.store.StoreError):
pass
class Interrupted(gf.store.StoreError):
def __str__(self):
return 'Interrupted.'
class QSSPRunner:
......@@ -370,19 +373,30 @@ class QSSPRunner:
os.chdir(self.tempdir)
interrupted = []
def signal_handler(signum, frame):
os.kill(proc.pid, signal.SIGTERM)
interrupted.append(True)
original = signal.signal(signal.SIGINT, signal_handler)
try:
proc = Popen(program, stdin=PIPE, stdout=PIPE, stderr=PIPE)
except OSError:
os.chdir(old_wd)
raise QSSPError('could not start qssp: "%s"' % program)
try:
proc = Popen(program, stdin=PIPE, stdout=PIPE, stderr=PIPE)
except OSError:
os.chdir(old_wd)
raise QSSPError('could not start qssp: "%s"' % program)
(output_str, error_str) = proc.communicate('input\n')
(output_str, error_str) = proc.communicate('input\n')
finally:
signal.signal(signal.SIGINT, original)
if interrupted:
raise KeyboardInterrupt()
logger.debug('===== begin qssp output =====\n'
'%s===== end qssp output =====' % output_str)
if error_str:
logger.error('===== begin qssp error =====\n'
'%s===== end qssp error =====' % error_str)
errmess = []
if proc.returncode != 0:
......@@ -432,8 +446,12 @@ qssp has been invoked as "%s"'''.lstrip() %
return traces
def __del__(self):
if not self.keep_tmp:
shutil.rmtree(self.tempdir)
if self.tempdir:
if not self.keep_tmp:
shutil.rmtree(self.tempdir)
self.tempdir = None
else:
logger.warn('not removing temporary directory: %s' % self.tempdir)
class QSSPGFBuilder(gf.builder.Builder):
def __init__(self, store_dir, step, shared, block_size=None, tmp=None ):
......@@ -521,6 +539,9 @@ class QSSPGFBuilder(gf.builder.Builder):
QSSPReceiver( lat=90-d*cake.m2d, lon=180., tstart=0.0, distance=d) for
d in distances ]
if self.step == 0:
gf_filename = 'TEMP' + gf_filename[2:]
gfs = [ QSSPGreen( filename=gf_filename,
depth= sz/km, calculate=(self.step==0) ) ]
......@@ -533,6 +554,10 @@ class QSSPGFBuilder(gf.builder.Builder):
trise=trise, torigin=0.0) ]
runner.run(conf)
gf_path = os.path.join(conf.gf_directory, '?_' + gf_filename)
for s in glob.glob(gf_path):
d = s.replace('TEMP_', 'GF_')
os.rename(s,d)
else:
for mt, gfmap in self.gfmapping[:[3,4][self.gf_set.ncomponents==10]]:
......@@ -546,35 +571,53 @@ class QSSPGFBuilder(gf.builder.Builder):
runner.run(conf)
rawtraces = runner.get_traces()
interrupted = []
def signal_handler(signum, frame):
interrupted.append(True)
original = signal.signal(signal.SIGINT, signal_handler)
self.store.lock()
duplicate_inserts = 0
try:
for itr, tr in enumerate(rawtraces):
if tr.channel in gfmap:
x = tr.meta['distance']
ig, factor = gfmap[tr.channel]
if len(self.store.config.ns) == 2:
args = (sz,x,ig)
else:
args = (rz,sz,x,ig)
if conf.cut:
tmin = self.store.t(conf.cut[0], args[:-1])
tmax = self.store.t(conf.cut[1], args[:-1])
if None in (tmin, tmax):
continue
tr.chop(tmin, tmax)
gf_tr = gf.store.GFTrace.from_trace(tr)
gf_tr.data *= factor
try:
self.store.put(args, gf_tr)
except gf.store.DuplicateInsert, e:
duplicate_inserts += 1
finally:
if duplicate_inserts:
logger.warn('%i insertions skipped (duplicates)' %
duplicate_inserts)
self.store.unlock()
signal.signal(signal.SIGINT, original)
if interrupted:
raise KeyboardInterrupt()
for itr, tr in enumerate(rawtraces):
if tr.channel in gfmap:
x = tr.meta['distance']
ig, factor = gfmap[tr.channel]
if len(self.store.config.ns) == 2:
args = (sz,x,ig)
else:
args = (rz,sz,x,ig)
if conf.cut:
tmin = self.store.t(conf.cut[0], args[:-1])
tmax = self.store.t(conf.cut[1], args[:-1])
if None in (tmin, tmax):
continue
tr.chop(tmin, tmax)
gf_tr = gf.store.GFTrace.from_trace(tr)
gf_tr.data *= factor
self.store.put(args, gf_tr)
self.store.unlock()
logger.info('Done with step %i, block %i / %i' %
(self.step, index+1, self.nblocks))
......@@ -629,23 +672,54 @@ def init(store_dir):
return gf.store.Store.create_editables(store_dir, config=config, extra={'qssp': qssp})
def __work_block(args):
store_dir, step, iblock, shared = args
builder = QSSPGFBuilder(store_dir, step, shared)
builder.work_block(iblock)
try:
store_dir, step, iblock, shared = args
builder = QSSPGFBuilder(store_dir, step, shared)
builder.work_block(iblock)
except KeyboardInterrupt:
raise Interrupted()
except IOError, e:
if e.errno == errno.EINTR:
raise Interrupted()
else:
raise
def build(store_dir, force=False, nworkers=None):
return store_dir, step, iblock
gf.store.Store.create_dependants(store_dir, force)
def build(store_dir, force=False, nworkers=None, continue_=False):
done = set()
status_fn = pjoin(store_dir, '.status')
if not continue_:
gf.store.Store.create_dependants(store_dir, force)
with open(status_fn, 'w') as status:
pass
else:
try:
with open(status_fn, 'r') as status:
for line in status:
done.add(tuple(int(x) for x in line.split()))
except IOError:
raise gf.StoreError('nothing to continue')
shared = {}
for step in (0,1):
builder = QSSPGFBuilder(store_dir, step, shared)
iblocks = builder.all_block_indices()
if nworkers is None or nworkers > 1:
p = Pool(nworkers)
p.map(__work_block, [ (store_dir, step, iblock, shared) for iblock in iblocks ])
iblocks = [ x for x in builder.all_block_indices() if (step, x) not in done ]
del builder
else:
map(__work_block, [ (store_dir, step, iblock, shared) for iblock in iblocks ])
original = signal.signal(signal.SIGINT, signal.SIG_IGN)
try:
for x in parimap(__work_block, [ (store_dir, step, iblock, shared) for iblock in iblocks ],
nprocs=nworkers):
store_dir, step, iblock = x
with open(status_fn, 'a') as status:
status.write('%i %i\n' % (step, iblock))
finally:
signal.signal(signal.SIGINT, original)
os.remove(status_fn)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment