Rphree_ReactTrans.R 28.7 KB
Newer Older
Marco De Lucia's avatar
Marco De Lucia committed
1
2
3
##  Functions for dealing with surrogate simulations

### Marco De Lucia, delucia@gfz-potsdam.de, 2009-2018
4
### Time-stamp: "Last modified 2019-05-08 17:19:57 delucia"
Marco De Lucia's avatar
Marco De Lucia committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19


##' Todo
##'
##' @title Simple explicit 1D forward-Euler advection, multispecies
##' @param conc matrix containing the chemical state
##' @param inflow vector of concentrations entering the inlet
##' @param dx the grid spacing
##' @param dt the required time step
##' @return a matrix containing the state after the advection
##' @author MDL
##' @export
AdvectionPQC <- function(conc, inflow=rep(0,ncol(conc)), dx, dt)
{
    if (is.matrix(conc)) {
20
21
        ##        msg("conc is matrix")
        ## CurrConc <<- conc
Marco De Lucia's avatar
Marco De Lucia committed
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
        ## actually the first iteration does not have the attributes attached
        if ( "immobile" %in% names(attributes(conc)) ) { 
            immobile <- attr(conc,"immobile")
            val <- conc[,-immobile]
        } else {
            msg("input does not have immobile columns!")
            val <- conc
        }
        ## sort the columns of the matrix matching the names of boundary 
        sptr <- colnames(val)
        spin <- names(inflow) 
        ind <- match(spin,sptr)
        if (TRUE %in% is.na(ind)) {
            val <- cbind(val,matrix(0,ncol=sum(is.na(ind)),nrow=nrow(val) ))
            colnames(val) <- c(sptr,spin[is.na(ind)])
        }

        sptr <- colnames(val)
        ind <- match(sptr,spin)
        bound <- numeric(ncol(val))
        bound[!is.na(ind)] <- inflow[na.omit(ind)]
        cj <- rbind(bound,val)
        remnames <- colnames(cj)
        if (typeof(cj)!="double") {
46
47
            msg("cj is not 'double'")
            ## mode(cj) <- "double"
Marco De Lucia's avatar
Marco De Lucia committed
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
        }
        cnew <- -U*dt/dx*diff(cj) + val 

        ## if the original input had immobile attribute and columns
        ## then reattach the columns and the attribute
        if ( "immobile" %in% names(attributes(conc)) ) { 
            res <- cbind(cnew, conc[,immobile])
            
            attr(res,"immobile") <- immobile 
        } else
            res <- cnew

        colnames(res) <- colnames(conc)

        if (typeof(res)!="double") {
63
64
            ##mode(res) <- "double"
            msg("res is not 'double'")
Marco De Lucia's avatar
Marco De Lucia committed
65
66
67
68
69
70
71
        }

        return(res)
    } else {
        cj <- c(inflow,conc)
        cnew <- -U*dt/dx*diff(cj)+conc
        if (typeof(cnew)!="double") {
72
73
            msg("cnew is not double")
            ## mode(cnew) <- "double"
Marco De Lucia's avatar
Marco De Lucia committed
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
        }

        return(cnew)
    }
}

##' Makes sure that we get activities instead of pH/pe
##'
##' The manipulation of the state should be in place
##' @title Ensure that pH/pe are transformed into activities
##' @param state a chemical state (coerced to data.matrix)
##' @return the state where, if necessary, pH is transformed into activity(H+)
##' @author MDL
##' @export
pH2Act <- function(state){
89
90
91
92
93
94
95
    rem_attr <- attributes(state)
    
    stateNames <- colnames(state)
    if (mode(state)!="double" & length(dim(state)) ==2) {
        mat <- apply(state, 2, as.numeric)
    } else {
        mat <- data.matrix(state)
Marco De Lucia's avatar
Marco De Lucia committed
96
    }
97
98

    if ( "pH" %in% stateNames) {
Marco De Lucia's avatar
Marco De Lucia committed
99
        mat[,"pH"] <- 10^(-mat[,"pH"])
100
101
102
        stateNames[stateNames=="pH"] <- "H"
   }
    if ( "pe" %in% stateNames) { 
Marco De Lucia's avatar
Marco De Lucia committed
103
        mat[,"pe"] <- 10^(-mat[,"pe"])
104
        stateNames[stateNames=="pe"] <- "e"
Marco De Lucia's avatar
Marco De Lucia committed
105
106
    }

107
108
109
    if ("immobile" %in% names(rem_attr)) attr(mat,"immobile") <- rem_attr$immobile
    if ("index" %in% names(rem_attr)) attr(mat,"index") <- rem_attr$index
    colnames(mat) <- stateNames
Marco De Lucia's avatar
Marco De Lucia committed
110
111
112
113
114
115
116
117
118
119
120
121
122
    return(mat)
}

##' Makes sure that we get pH/pe instead of activities
##'
##' The manipulation of the state should be in place
##' @title Ensure that activities are transformed into pH/pe
##' @param state a chemical state (coerced to data.matrix)
##' @return the state where, if necessary, activity(H+) is transformed
##'     into pH
##' @author MDL
##' @export
Act2pH <- function(state){
123
124
125
    rem_attr <- attributes(state)

    stateNames <- colnames(state)
Marco De Lucia's avatar
Marco De Lucia committed
126

127
128
129
130
    if (mode(state)!="double" & length(dim(state)) ==2) {
        mat <- apply(state, 2, as.numeric)
    } else {
        mat <- data.matrix(state)
Marco De Lucia's avatar
Marco De Lucia committed
131
    }
132
    if ( "H" %in% stateNames) {
Marco De Lucia's avatar
Marco De Lucia committed
133
        mat[,"H"] <- -log10(mat[,"H"])
134
        stateNames[stateNames=="H"] <- "pH"
Marco De Lucia's avatar
Marco De Lucia committed
135
    }
136
    if ( "e" %in% stateNames) { 
Marco De Lucia's avatar
Marco De Lucia committed
137
        mat[,"e"] <- -log10(mat[,"e"])
138
        stateNames[stateNames=="e"] <- "pe"
Marco De Lucia's avatar
Marco De Lucia committed
139
140
    }

141
142
143
    if ("immobile" %in% names(rem_attr)) attr(mat,"immobile") <- rem_attr$immobile
    if ("index" %in% names(rem_attr)) attr(mat,"index") <- rem_attr$index
    colnames(mat) <- stateNames
Marco De Lucia's avatar
Marco De Lucia committed
144
145
146
147
148
149
150
151
152
153
154
155
156
157
    return(mat)
}


##' TODO
##'
##' @title Recompose a chemical state after reduction
##' @param state the state to be recomposed
##' @param reduced the initial reduced state containing the "index" attribute
##' @return a state with the original (full-grid) number of elements
##' @author MDL
##' @export
RecomposeState <- function(state, reduced)
{
158
159
160
161
162
163
164
165
166
167
    if ("index" %in% names(attributes(reduced))) {
        inds <- attr(reduced,"index")
        ## msg("print state:")
        ## print(state)
        ## msg("print inds:")
        ## print(inds)
    } else stopmsg("no reduction indeces!")
    res <- state[inds,]
    attr(res,"immobile") <- attr(reduced,"immobile")
    return(res)
Marco De Lucia's avatar
Marco De Lucia committed
168
169
170
171
172
173
174
175
176
177
178
179
180
}

##' This functions requires the package \code{mgcv}
##'
##' TODO
##' @title Finds unique problems in a geochemical state
##' @param data the matrix
##' @return a matrix plus an attribute containing the indeces of the
##'     compressed matrix
##' @author MDL
##' @export
ReduceState <- function(data)
{
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
    require(mgcv)
    
    red <- mgcv::uniquecombs(data)
    colnames(red) <- colnames(data)
    attr(red,"immobile") <- attr(data,"immobile") 
    return(red)
}



##' TODO
##' @title 1D Reactive transport simulations using phreeqc or a
##'     surrogate, equilibrium simulations
##' @param setup a list with several input parameters
##' @param init optional, initial state of the simulation
##' @param maxtime depending on \code{step}, the simulation time
##' @param step one of c("time","iter","fix_dt"), defines the time
##'     stepping
##' @param db the chemical database
##' @param procs number of processors to be used in parallel execution
##' @param maxsim number of simulations above which parallelization is
##'     triggered
##' @param reduce logical, defaults to TRUE. Should we find the unique
##'     occurences of a geochemical problem?
##' @param ebreak logical, defaults to FALSE. If TRUE, an early break
##'     is invoked
##' @param writeout logical, defaults to FALSE.
##' @param surrogate logical, defaults to FALSE. Should we use
##'     surrogates instead of phreeqc?
##' @param surrogate.FUN The function which calls the surrogate
##' @param model The list of surrogate models, one element per
##'     variable
##' @param baleq data structure containing the balance equations
##' @param tol absolute tolerance on the mass balance
##' @param call_pqc logical, defaults to TRUE: if TRUE, calls phreeqc
##'     on the surrogate simulations which trespass the tolerance
##' @return a list containing lots of stuff
##' @author MDL
##' @export 
ReactTranspBalanceEq <- function(setup, init, maxtime, step=c("time","iter","fix_dt"),
                                 db="phreeqc.dat", procs=1, maxsim=150,
                                 reduce=TRUE, ebreak=FALSE, writeout=FALSE, surrogate=FALSE,
                                 surrogate.FUN, model, baleq, tol=1E-9, call_pqc=TRUE)
    
{
226
    require(doParallel)
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
    if (surrogate) {
        if (missing(surrogate.FUN))
            stopmsg("Need a valid function to apply surrogate instead of chemistry")
        if (is.function(surrogate.FUN))
            Selected.Surrogate <- surrogate.FUN
    }
    
    step <- match.arg(step)
    ## prototype of the PHREEQC simulation
    base     <- setup$base
    ## SELECTED_OUTPUT and USER_PUNCH sections of the PHREEQC input data
    first    <- setup$first
    ## boundary conditions
    bound    <- setup$bound
    ## prop are the names of the properties for PHREEQC
    prop     <- setup$prop
    immobile <- setup$immobile
    initsim  <- setup$initsim

246
    if (procs > 1) {
247
248
249
250
251
252
253
        if (Sys.info()[["sysname"]]=="Windows") {
            ThisRunCluster <<- parallel::makePSOCKcluster(procs)
        } else {
            ThisRunCluster <<- parallel::makeForkCluster(procs)
        }

        doParallel::registerDoParallel(ThisRunCluster)
254
255
256
        msg("Registered default doParallel cluster with ", procs, "nodes")
    }

257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
    n  <- setup$n
    L  <- setup$len  # m
    U  <- setup$U    # m3/s
    dx <- L/n        # m
    dt <- dx/U * setup$Cf
    msg("Pure advection 1D simulations")

    ## Find out the number of iterations we're about to calculate
    if (step=="fix_dt"){
        dt <- setup$dt
        maxiter <- floor(maxtime/dt)+1
        msg("Fixed time step of ", dt, ". ", maxiter, " iterations required to reach enditme of", maxtime)
    }
    
    if (step=="time") {
        maxiter <- floor(maxtime/dt)+1
        msg("Courant number=", U*dt/dx, "; dt=",dt)
        msg(maxiter,"iterations required") 
    }
    
    if (step=="iter") {
        maxiter <- maxtime
        msg("Courant number=", U*dt/dx, "; dt=",dt,";")
        msg("Simulation will end at approx ",floor(maxiter*dt),"secs") 
    }
    
    ## MDL: add "complete" list. This will be a list of lists. Each
    ## iteration is a 3-components sublist storing state_T (the input
    ## to chemistry, after transport step), state_C (the input to
    ## transport, after chemistry), and the whole PHREEQC output.
    saved_complete <- vector(mode="list",length=maxiter+1)
    pad <- floor(log10(maxiter+1))+1 ## to properly format the step names
    spr_string <- paste0("%0", pad,"d")
    names(saved_complete) <- paste0("step_",sprintf(spr_string,seq(0,maxiter)))
    
    out_inp <<- vector(mode="list",length=length(maxiter))
    out_res <<- vector(mode="list",length=length(maxiter))
    out_bal <<- vector(mode="list",length=length(maxiter))
    out_of_tol <<- vector(mode="list",length=length(maxiter))

    timing <- matrix(NA,ncol=5,nrow=maxiter)

    msg("Loading phreeqc db... ")
    phreeqc::phrLoadDatabaseString(db)
    msg("database loaded.")
302
303
304
305

    if (procs > 1) {
        parallel::clusterCall(cl=ThisRunCluster, phreeqc::phrLoadDatabaseString, db)
        msg("Database loaded on each worker.")
306
307
308
309
        if (surrogate) {
            parallel::clusterExport(cl=ThisRunCluster, varlist=c("model","surrogate.FUN"), envir = environment())
            msg("All workers are setup with the surrogate model.")
        }
310
311
    }

312
313
314
315
316
317
318
319
320
321
322
323
    if (missing(init)) {
        msg("missing initial chemical state")
        if (!is.character(initsim))
            runinitsim <- c(first,base)
        else
            runinitsim <- initsim
        
        msg("running initsim PHREEQC script assuming EQUILIBRIUM and correct PUNCH specifications...")
        tmpfirstrun <- RunPQC(runinitsim)
        msg("assuming homogeneous medium")
        msg("Names gathered after first run:", paste(colnames(tmpfirstrun), collapse=", "))
        state_C <- matrix(rep(tmpfirstrun,n), byrow=TRUE, ncol=length(prop))
324
        colnames(state_C) <- colnames(tmpfirstrun)
325
    } else {
326
        msg("given initial state; assuming correct colnames")
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
        state_C <- init
    }
    
    attr(state_C,"immobile") <- immobile

    ## ssC <<- state_C
    index_saved <- 1
    saved_complete[[index_saved]] <- list(C=Act2pH(state_C))
    index_saved <- index_saved + 1
    
    ## transform the pH/pe back into activities for Advection
    msg("first step, Advection()...")
    state_T <- AdvectionPQC(pH2Act(state_C), inflow=bound, dx=dx, dt=dt)
    msg("Advection done...")
    Tr <- Ts <- 0
    
    ## treat special H+/pH, e-/pe cases
    state_T <- Act2pH(state_T)
Marco De Lucia's avatar
Marco De Lucia committed
345
    
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
    if (reduce)
        Tr <- system.time( reduced <- ReduceState(state_T) )[3]
    else
        reduced <- state_T
    
    msg("first step, chemistry ...")
    Ts <- system.time(inplist <- splitMultiFix(data=reduced, procs=procs, base=base, first=first,
                                               prop=prop, minerals=immobile, nmax=maxsim))[3]
    
    Tm <- system.time(res_C <- RunPQC(inplist, procs=procs))[3]
    attr(res_C,"immobile") <- immobile
    
    msg("iteration 0, CPU-time = ",round(Tm,3),"[s]")

    ## recompose after the reduction
    if (reduce)
        Tr <- Tr + system.time(state_C <- RecomposeState(Act2pH(res_C), reduced))[3]
    else {
        state_C <- res_C
        immobile_col_names <- setup$prop[setup$immobile]
        immobile_inds <- match( immobile_col_names, colnames(state_C) )
        attr(state_C,"immobile") <- immobile_inds
    }
    
    saved_complete[[index_saved]] <- list(T=Act2pH(state_T), C=Act2pH(state_C))
    index_saved <- index_saved + 1
    out_inp[[1]] <<- inplist
    out_res[[1]] <<- res_C
    
    if (ebreak) {
        msg("Early break invoked, bye.")
        return(saved_complete[[1]])
    }
    time <- dt
    iter <- 1

    timing[iter,] <- c(dim(reduced)[1],procs,Tm, Tr, Ts)

    msg("going through iterations now!")
    
    on.exit({
        msg("MAIN LOOP interrupted during iteration",iter,"!!")
        msg("returning last calculated chemistry.")
        print(traceback())
        msg(" Bye.")
        attr(state_C,"timing") <- timing
        saved_complete$currentT <- Act2pH(state_T)
        saved_complete$current <- Act2pH(state_C)
        return(saved_complete)
    })
    
    ## start of the iterations
    while (iter<maxiter) {
        ## iterations
        # TODO: It seems to me that iteration numberins is off by one, 
        # so that the first run on Rphree is actually one and the next ones end
        time <- time + dt
        iter <- iter + 1
        #cat("ENTERING ADV","\n")

        ## transform the pH/pe back into activities
        state_T <- AdvectionPQC(pH2Act(state_C), inflow=bound, dx=dx, dt=dt)

        ## treat special H+/pH, e-/pe cases
        state_T <- Act2pH(state_T)

        ## reduction of the problem
        if(reduce)
            Tr <- system.time(reduced <- ReduceState(state_T))[3]
        else
            reduced <- state_T

        ## this is the place where we switch the simulator to the surrogate model
        ## for final test for the geochemical surrogate model!
        
        if (surrogate) { ## use a surrogate model created by the Surrogate playground script
            Ts <- 0.0        ## no split and distribute for this guy

            Tm <- system.time( tmpsur <- Selected.Surrogate(reduced, model=model) )[3]
            bal <- CheckBalance(baleq,reduced,tmpsur)
            out_bal[[iter]] <<- bal
            totbal <- apply(bal, 1, mae)

            nonok <- which(totbal>tol) ### max(tol,tol2))
            if (length(nonok)>0){
                msg("found", length(nonok), "/", nrow(reduced), " offending simulations",
                    ifelse(call_pqc, ", calling PHREEQC"," "))
                if (call_pqc) {
                    reducednonok <- reduced[nonok, ]
                    Ts <- Ts + system.time(inplist <- splitMultiFix(data=reducednonok, procs=procs, base=base, first=first,
                                                                    prop=prop, minerals=immobile, nmax=maxsim))[3]
                    
                    out_inp[[iter]] <- inplist
                    if (length(nonok)==1) {
                        Tm <- Tm + system.time(pqcres_C <- RunPQC(inplist, procs=1))[3]
                    } else {
                        Tm <- Tm + system.time(pqcres_C <- RunPQC(input=inplist, procs=procs))[3]
                    }
                    ##mat <- apply(state, 2, as.numeric)

                    tmpsur[nonok, ] <- pqcres_C
                }
            }
            res_C <- tmpsur
            attr(res_C,"immobile") <- immobile
        } else { ## no surrogate, the original implementation
            ## new input
            Ts <- system.time(inplist <- splitMultiFix(data=reduced, procs=procs, base=base, first=first,
                                                       prop=prop, minerals=immobile, nmax=maxsim))[3]
            out_inp[[iter]] <<- inplist
            Tm <- system.time(res_C <- RunPQC(inplist, procs=procs))[3]
            attr(res_C,"immobile") <- immobile
        }
        
        ## recompose after the reduction
        if (reduce)
            Tr <- Tr + system.time(state_C <- RecomposeState(res_C, reduced))[3]
        else {
            state_C <- res_C
            immobile_col_names <- setup$prop[setup$immobile]
            immobile_inds <- match( immobile_col_names, colnames(state_C) )
            attr(state_C,"immobile") <- immobile_inds
        }
        
        ## ## transform the pH/pe back into activities
        ## state_C <- pH2Act(state_C)

        out_res[[iter]] <<- res_C

        timing[iter,] <- c( dim(reduced)[1], procs, Tm, Tr, Ts)
        msg("done iteration", iter, "/", maxiter," CPU-time ",round(Tm,3),"[s]")

        saved_complete[[index_saved]] <- list(T=Act2pH(state_T), C=Act2pH(state_C))
        index_saved <- index_saved + 1
        
    }
    on.exit()
483
    
484
485
486
487
488
489
    if (procs > 1) {
        stopCluster(ThisRunCluster)
        msg("Parallelization cluster stopped")
    }


490
491
492
493
494
495
    msg("total time of chemistry: ",round(sum(timing[,3]),3),"seconds")
    msg("total number of simulations: ",sum(timing[,1]))
    msg(" Bye.")
    
    attr(saved_complete,"timing") <- timing
    return(saved_complete)
Marco De Lucia's avatar
Marco De Lucia committed
496
497
}

498

Marco De Lucia's avatar
Marco De Lucia committed
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
##' Please look at the provided example
##' @title 1D Reactive transport simulations using phreeqc or a
##'     surrogate
##' @param setup a list with several input parameters
##' @param init optional, initial state of the simulation
##' @param maxtime depending on \code{step}, the simulation time
##' @param step one of c("time","iter","fix_dt"), defines the time
##'     stepping
##' @param db the chemical database
##' @param procs number of processors to be used in parallel execution
##' @param maxsim number of simulations above which parallelization is
##'     triggered
##' @param reduce logical, defaults to TRUE. Should we find the unique
##'     occurences of a geochemical problem?
##' @param ebreak logical, defaults to FALSE. If TRUE, an early break
##'     is invoked
##' @param writeout logical, defaults to FALSE.
##' @param surrogate logical, defaults to FALSE. Should we use
##'     surrogates instead of phreeqc?
##' @param surrogate.FUN The function which calls the surrogate
##' @param model The list of surrogate models, one element per
##'     variable
##' @param baleq data structure containing the balance equations
##' @param tol absolute tolerance on the mass balance
##' @param call_pqc logical, defaults to TRUE: if TRUE, calls phreeqc
##'     on the surrogate simulations which trespass the tolerance
##' @return a list containing lots of stuff
##' @author MDL
##' @export 
ReactTranspBalanceKin <- function(setup, init, maxtime, step=c("time","iter","fix_dt"),
                                  db="phreeqc.dat", procs=1, maxsim=150,
                                  reduce=TRUE, ebreak=FALSE, writeout=FALSE, surrogate=FALSE,
                                  surrogate.FUN, model, baleq, tol=1E-9, call_pqc=TRUE)
    
{

    if (surrogate) {
        if (missing(surrogate.FUN))
537
            stopmsg("Need a valid function to apply surrogate instead of chemistry")
Marco De Lucia's avatar
Marco De Lucia committed
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
        if (is.function(surrogate.FUN))
            Selected.Surrogate <- surrogate.FUN
    }
    step <- match.arg(step)
    ## prototype of the PHREEQC simulation
    base     <- setup$base
    ## SELECTED_OUTPUT and USER_PUNCH sections of the PHREEQC input data
    first    <- setup$first
    ## boundary conditions
    bound    <- setup$bound
    ## prop are the names of the properties for PHREEQC
    prop     <- setup$prop
    immobile <- setup$immobile
    kin      <- setup$kin
    initsim  <- setup$initsim
    ann      <- setup$ann
554

555

556
    if (procs > 1) {
557
558
559
560
561
562
563
        if (Sys.info()[["sysname"]]=="Windows") {
            ThisRunCluster <<- parallel::makePSOCKcluster(procs)
        } else {
            ThisRunCluster <<- parallel::makeForkCluster(procs)
        }

        doParallel::registerDoParallel(ThisRunCluster)
564
565
566
        msg("Registered default doParallel cluster with ", procs, "nodes")
    }

Marco De Lucia's avatar
Marco De Lucia committed
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
    n  <- setup$n
    L  <- setup$len  # m
    U  <- setup$U    # m3/s
    dx <- L/n        # m
    dt <- floor(dx/U * setup$Cf)
    msg("convective simulations")
    
    ## Find out the number of iterations we're about to calculate
    if (step=="fix_dt"){
        dt <- setup$dt
        maxiter <- floor(maxtime/dt)+1
        msg("Fixed time step of ", dt, ". ", maxiter, " iterations required to reach enditme of", maxtime)
    }
        
    if (step=="time") {
        maxiter <- floor(maxtime/dt)+1
        msg("Courant number=", U*dt/dx, "; dt=",dt)
        msg(maxiter,"iterations required") 
    }

    if (step=="iter") {
        maxiter <- maxtime
        msg("Courant number=", U*dt/dx, "; dt=",dt,";")
        msg("Simulation will end at approx ",floor(maxiter*dt),"secs") 
    }

    ## MDL: add "complete" list. This will be a list of lists. Each
    ## iteration is a 3-components sublist storing state_T (the input
    ## to chemistry, after transport step), state_C (the input to
    ## transport, after chemistry), and the whole PHREEQC output.
    saved_complete <- vector(mode="list",length=maxiter+1)
    pad <- floor(log10(maxiter+1))+1 ## to properly format the step names
    spr_string <- paste0("%0", pad,"d")
    names(saved_complete) <- paste0("step_",sprintf(spr_string,seq(0,maxiter)))
    
    out_inp <<- vector(mode="list",length=length(maxiter))
    out_res <<- vector(mode="list",length=length(maxiter))
    out_bal <<- vector(mode="list",length=length(maxiter))
    
    timing <- matrix(NA,ncol=5,nrow=maxiter)
    msg("Loading phreeqc db... ")
608
    phreeqc::phrLoadDatabaseString(db)
Marco De Lucia's avatar
Marco De Lucia committed
609
    msg("database loaded.")
610
611
    if (procs > 1) {
        parallel::clusterCall(cl=ThisRunCluster, phreeqc::phrLoadDatabaseString, db)
612
613
614
615
616
        if (surrogate) {
            parallel::clusterExport(cl=ThisRunCluster, varlist=c("model","surrogate.FUN"), envir = environment())
            msg("All workers are setup with the surrogate model.")
        }
        
617
    }
Marco De Lucia's avatar
Marco De Lucia committed
618
619
    if (missing(init)) {
        msg("missing initial chemical state")
620
621
        if (is.null(initsim))
            initsim <- c(first,base)
Marco De Lucia's avatar
Marco De Lucia committed
622
623

        msg("running initsim PHREEQC script assuming EQUILIBRIUM and correct PUNCH specifications...")
624
        tmpfirstrun <- RunPQC(initsim)
Marco De Lucia's avatar
Marco De Lucia committed
625
        msg("assuming homogeneous medium")
626
        state_C <- matrix(rep(tmpfirstrun,n), byrow=TRUE, ncol=length(prop))
627
        colnames(state_C) <- colnames(tmpfirstrun)
Marco De Lucia's avatar
Marco De Lucia committed
628
    } else {
629
        msg("given initial state; assuming correct colnames")
Marco De Lucia's avatar
Marco De Lucia committed
630
631
        state_C <- init
    }
632

Marco De Lucia's avatar
Marco De Lucia committed
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
    attr(state_C,"immobile") <- immobile

    ## we save with pe/pH
    index_saved <- 1
    saved_complete[[index_saved]] <- list(C=Act2pH(state_C))
    index_saved <- index_saved + 1

    ## transform the pH/pe back into activities for Advection
    msg("first step, Advection()...")
    state_T <- AdvectionPQC(pH2Act(state_C), inflow=bound, dx=dx, dt=dt)
    Tr <- Ts <- 0

    ## treat special H+/pH, e-/pe cases
    state_T <- Act2pH(state_T)
  
    if (reduce)
        Tr <- system.time( reduced <- ReduceState(state_T) )[3]
    else
        reduced <- state_T
    
    msg("first step, chemistry ...")
    Ts <- system.time(inplist <- splitMultiKin(data=reduced, procs=procs, base=base, first=first, ann=ann, 
                                               prop=prop, minerals=immobile, kin=kin, dt=dt, nmax=maxsim))[3]
    
    Tm <- system.time(tmp <- RunPQC(inplist, procs=procs))[3]
    res_C <- tmp[, prop]
    attr(res_C,"immobile") <- immobile
    
    msg("iteration 0, CPU-time = ",round(Tm,3),"[s]")

    ## recompose after the reduction
    if (reduce)
        Tr <- Tr + system.time(state_C <- RecomposeState(res_C, reduced))[3]
    else {
        state_C <- res_C
        immobile_col_names <- setup$prop[setup$immobile]
        immobile_inds <- match( immobile_col_names, colnames(state_C) )
        attr(state_C,"immobile") <- immobile_inds
    }
    
    saved_complete[[index_saved]] <- list(T=Act2pH(state_T), C=Act2pH(state_C))
    index_saved <- index_saved + 1
    out_inp[[1]] <<- inplist
    out_res[[1]] <<- res_C
    
    if (ebreak) {
        msg("Early break invoked, bye.")
        return(saved_complete[[1]])
    }
    time <- dt
    iter <- 1
684

Marco De Lucia's avatar
Marco De Lucia committed
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
    timing[iter,] <- c(dim(reduced)[1],procs,Tm, Tr, Ts)
    
    msg("going through iterations now!")
    
    on.exit({
        msg("MAIN LOOP interrupted during iteration",iter,"!!")
        msg("returning last calculated chemistry.")
        print(traceback())
        msg(" Bye.")
        attr(state_C,"timing") <- timing
        saved_complete$currentT <- Act2pH(state_T)
        saved_complete$current <- Act2pH(state_C)
        return(saved_complete)
    })
    
    ## start of the iterations
    while (iter<maxiter) {
        ## iterations
        # TODO: It seems to me that iteration numberins is off by one, 
        # so that the first run on Rphree is actually one and the next ones end
        time <- time + dt
        iter <- iter + 1
        #cat("ENTERING ADV","\n")

        ## transform the pH/pe back into activities
        state_T <- AdvectionPQC(pH2Act(state_C), inflow=bound, dx=dx, dt=dt)

        ## reduction of the problem
        if(reduce)
            Tr <- system.time(reduced <- ReduceState(state_T))[3]
        else
            reduced <- state_T

        ## transform the activity back into pH/pe
        reduced <- Act2pH(reduced)

        ## this is the place where we switch the simulator to the surrogate model
        ## for final test for the geochemical surrogate model!
        
        if (surrogate) { ## use a surrogate model created by the Surrogate playground script
            Ts <- 0.0        ## no split and distribute for this guy
            
            Tm <- system.time( tmpsur <- Selected.Surrogate(reduced, model=model) )[3]
            ## print(head(tmpsur))
            bal <- CheckBalance(baleq,reduced,tmpsur)
            out_bal[[iter]] <<- bal
            totbal <- apply(bal, 1, mae)
            
            ## cat(":: Surrogate Balance Summary:\n")
            ## print(summary(totbal))
            ## tol2 <- unname(quantile(totbal, .85))
            ## cat(" :RT: Balance Checked\n")
            
            nonok <- which(totbal>tol)
            if (length(nonok)>0){
                msg("found", length(nonok), "/", nrow(reduced), " offending simulations",
                    ifelse(call_pqc, ", calling PHREEQC"," "))
                if (call_pqc) {
                    reducednonok <- reduced[nonok, ]
                    Ts <- Ts + system.time(inplist <- splitMultiKin(data=reducednonok, procs=procs, base=base, first=first, ann=ann,
                                                                    prop=prop, minerals=immobile, kin=kin, dt=dt, nmax=maxsim))[3]
                    
                    out_inp[[iter]] <<- inplist
                    if (length(nonok)==1) {
                        Tm <- Tm + system.time(tmp2 <- RunPQC(inplist, procs=1))[3]
                        pqcres_C <- tmp2[,prop]
                    } else {
                        Tm <- Tm + system.time(tmp2 <- RunPQC(input=inplist, procs=procs))[3]
                        pqcres_C <- tmp2[,prop]
                    }
                    
                    tmpsur[nonok, ] <- pqcres_C
                }
            } 
            res_C <- tmpsur
            attr(res_C,"immobile") <- immobile
        } else { ## no surrogate, the original implementation
            ## new input
            Ts <- system.time(inplist <- splitMultiKin(data=reduced, procs=procs, base=base, first=first, ann=ann,
                                                       prop=prop, minerals=immobile, kin=kin, dt=dt, nmax=maxsim))[3]
            out_inp[[iter]] <<- inplist
            Tm <- system.time(tmp3 <- RunPQC(inplist, procs=procs))[3]
            res_C <- tmp3[,prop]
            attr(res_C,"immobile") <- immobile
        }
        
        ## recompose after the reduction
        if (reduce)
            Tr <- Tr + system.time(state_C <- RecomposeState(res_C, reduced))[3]
        else {
            state_C <- res_C
            immobile_col_names <- setup$prop[setup$immobile]
            immobile_inds <- match( immobile_col_names, colnames(state_C) )
            attr(state_C,"immobile") <- immobile_inds
        }

        ## transform the pH/pe back into activities
        state_C <- pH2Act(state_C)

        out_res[[iter]] <<- res_C
     
        timing[iter,] <- c( dim(reduced)[1], procs, Tm, Tr, Ts)
        msg("done iteration", iter, "/", maxiter," CPU-time ",round(Tm,3),"[s]")
        
        saved_complete[[index_saved]] <- list(T=Act2pH(state_T), C=Act2pH(state_C))
        index_saved <- index_saved + 1
        
    }
    on.exit()
794
795
796
797
798
799

    if (procs > 1) {
        stopCluster(ThisRunCluster)
        msg("Parallelization cluster stopped")
    }

Marco De Lucia's avatar
Marco De Lucia committed
800
801
802
803
804
805
806
807
    msg("total time of chemistry: ",round(sum(timing[,3]),3),"seconds")
    msg("total number of simulations: ",sum(timing[,1]))
    msg(" Bye.")
    
    attr(saved_complete,"timing") <- timing
    #cat("PASS iteration saving ")
    return(saved_complete)
}