Rphree_ReactTrans.R 29.7 KB
Newer Older
Marco De Lucia's avatar
Marco De Lucia committed
1
2
##  Functions for dealing with surrogate simulations

3
### Marco De Lucia, delucia@gfz-potsdam.de, 2009-2021
Marco De Lucia's avatar
Marco De Lucia committed
4
### Time-stamp: "Last modified 2021-04-29 18:33:18 delucia"
Marco De Lucia's avatar
Marco De Lucia committed
5
6


7
8
9
10
11
12
##' This function takes the current state of a chemical system in form
##' of a matrix, a vector of concentrations representing boundary
##' conditions (injection), the required time step (dt/s), grid spcing
##' (dx/m) and U (Darcy velocity/m^3/s). Note that it is
##' responsibility of user to check that the required dt does not
##' trespass the CFL condition.
Marco De Lucia's avatar
Marco De Lucia committed
13
14
##'
##' @title Simple explicit 1D forward-Euler advection, multispecies
15
16
##' @param conc either a vector or a matrix containing the chemical
##'     state to be updated
Marco De Lucia's avatar
Marco De Lucia committed
17
##' @param inflow vector of concentrations entering the inlet
18
19
##' @param dx the grid spacing in m
##' @param dt the required time step in s
20
##' @param U Darcy velocity in m^3/s
Marco De Lucia's avatar
Marco De Lucia committed
21
22
23
##' @return a matrix containing the state after the advection
##' @author MDL
##' @export
24
AdvectionPQC <- function(conc, inflow=rep(0,ncol(conc)), dx, dt, U)
Marco De Lucia's avatar
Marco De Lucia committed
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
{
    if (is.matrix(conc)) {
        ## actually the first iteration does not have the attributes attached
        if ( "immobile" %in% names(attributes(conc)) ) { 
            immobile <- attr(conc,"immobile")
            val <- conc[,-immobile]
        } else {
            msg("input does not have immobile columns!")
            val <- conc
        }
        ## sort the columns of the matrix matching the names of boundary 
        sptr <- colnames(val)
        spin <- names(inflow) 
        ind <- match(spin,sptr)
        if (TRUE %in% is.na(ind)) {
            val <- cbind(val,matrix(0,ncol=sum(is.na(ind)),nrow=nrow(val) ))
            colnames(val) <- c(sptr,spin[is.na(ind)])
        }

        sptr <- colnames(val)
        ind <- match(sptr,spin)
        bound <- numeric(ncol(val))
Marco De Lucia's avatar
Marco De Lucia committed
47
        bound[!is.na(ind)] <- inflow[stats::na.omit(ind)]
Marco De Lucia's avatar
Marco De Lucia committed
48
49
50
        cj <- rbind(bound,val)
        remnames <- colnames(cj)
        if (typeof(cj)!="double") {
51
52
            msg("cj is not 'double'")
            ## mode(cj) <- "double"
Marco De Lucia's avatar
Marco De Lucia committed
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
        }
        cnew <- -U*dt/dx*diff(cj) + val 

        ## if the original input had immobile attribute and columns
        ## then reattach the columns and the attribute
        if ( "immobile" %in% names(attributes(conc)) ) { 
            res <- cbind(cnew, conc[,immobile])
            
            attr(res,"immobile") <- immobile 
        } else
            res <- cnew

        colnames(res) <- colnames(conc)

        if (typeof(res)!="double") {
68
69
            ##mode(res) <- "double"
            msg("res is not 'double'")
Marco De Lucia's avatar
Marco De Lucia committed
70
71
72
73
74
75
76
        }

        return(res)
    } else {
        cj <- c(inflow,conc)
        cnew <- -U*dt/dx*diff(cj)+conc
        if (typeof(cnew)!="double") {
77
78
            msg("cnew is not double")
            ## mode(cnew) <- "double"
Marco De Lucia's avatar
Marco De Lucia committed
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
        }

        return(cnew)
    }
}

##' Makes sure that we get activities instead of pH/pe
##'
##' The manipulation of the state should be in place
##' @title Ensure that pH/pe are transformed into activities
##' @param state a chemical state (coerced to data.matrix)
##' @return the state where, if necessary, pH is transformed into activity(H+)
##' @author MDL
##' @export
pH2Act <- function(state){
94
95
96
97
98
99
100
    rem_attr <- attributes(state)
    
    stateNames <- colnames(state)
    if (mode(state)!="double" & length(dim(state)) ==2) {
        mat <- apply(state, 2, as.numeric)
    } else {
        mat <- data.matrix(state)
Marco De Lucia's avatar
Marco De Lucia committed
101
    }
102
103

    if ( "pH" %in% stateNames) {
Marco De Lucia's avatar
Marco De Lucia committed
104
        mat[,"pH"] <- 10^(-mat[,"pH"])
105
106
107
        stateNames[stateNames=="pH"] <- "H"
   }
    if ( "pe" %in% stateNames) { 
Marco De Lucia's avatar
Marco De Lucia committed
108
        mat[,"pe"] <- 10^(-mat[,"pe"])
109
        stateNames[stateNames=="pe"] <- "e"
Marco De Lucia's avatar
Marco De Lucia committed
110
111
    }

112
113
114
    if ("immobile" %in% names(rem_attr)) attr(mat,"immobile") <- rem_attr$immobile
    if ("index" %in% names(rem_attr)) attr(mat,"index") <- rem_attr$index
    colnames(mat) <- stateNames
Marco De Lucia's avatar
Marco De Lucia committed
115
116
117
118
119
120
121
122
123
124
125
126
127
    return(mat)
}

##' Makes sure that we get pH/pe instead of activities
##'
##' The manipulation of the state should be in place
##' @title Ensure that activities are transformed into pH/pe
##' @param state a chemical state (coerced to data.matrix)
##' @return the state where, if necessary, activity(H+) is transformed
##'     into pH
##' @author MDL
##' @export
Act2pH <- function(state){
128
129
130
    rem_attr <- attributes(state)

    stateNames <- colnames(state)
Marco De Lucia's avatar
Marco De Lucia committed
131

132
133
134
135
    if (mode(state)!="double" & length(dim(state)) ==2) {
        mat <- apply(state, 2, as.numeric)
    } else {
        mat <- data.matrix(state)
Marco De Lucia's avatar
Marco De Lucia committed
136
    }
137
    if ( "H" %in% stateNames) {
Marco De Lucia's avatar
Marco De Lucia committed
138
        mat[,"H"] <- -log10(mat[,"H"])
139
        stateNames[stateNames=="H"] <- "pH"
Marco De Lucia's avatar
Marco De Lucia committed
140
    }
141
    if ( "e" %in% stateNames) { 
Marco De Lucia's avatar
Marco De Lucia committed
142
        mat[,"e"] <- -log10(mat[,"e"])
143
        stateNames[stateNames=="e"] <- "pe"
Marco De Lucia's avatar
Marco De Lucia committed
144
145
    }

146
147
148
    if ("immobile" %in% names(rem_attr)) attr(mat,"immobile") <- rem_attr$immobile
    if ("index" %in% names(rem_attr)) attr(mat,"index") <- rem_attr$index
    colnames(mat) <- stateNames
Marco De Lucia's avatar
Marco De Lucia committed
149
150
151
152
153
154
155
156
157
158
159
160
    return(mat)
}


##' @title Recompose a chemical state after reduction
##' @param state the state to be recomposed
##' @param reduced the initial reduced state containing the "index" attribute
##' @return a state with the original (full-grid) number of elements
##' @author MDL
##' @export
RecomposeState <- function(state, reduced)
{
161
162
163
164
165
166
167
168
169
170
    if ("index" %in% names(attributes(reduced))) {
        inds <- attr(reduced,"index")
        ## msg("print state:")
        ## print(state)
        ## msg("print inds:")
        ## print(inds)
    } else stopmsg("no reduction indeces!")
    res <- state[inds,]
    attr(res,"immobile") <- attr(reduced,"immobile")
    return(res)
Marco De Lucia's avatar
Marco De Lucia committed
171
172
}

173
174
##' Applies \code{mgcv::uniquecombs} to the matrix representing the
##' current chemical state.
Marco De Lucia's avatar
Marco De Lucia committed
175
176
##' @title Finds unique problems in a geochemical state
##' @param data the matrix
177
178
179
##' @return a matrix with attached attribute containing the indeces of
##'     the compressed matrix for later restoring and the "immobile"
##'     indices
Marco De Lucia's avatar
Marco De Lucia committed
180
181
182
183
##' @author MDL
##' @export
ReduceState <- function(data)
{
184
185
186
187
188
189
190
191
    red <- mgcv::uniquecombs(data)
    colnames(red) <- colnames(data)
    attr(red,"immobile") <- attr(data,"immobile") 
    return(red)
}



192
##' Cfr \code{demo}
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
##' @title 1D Reactive transport simulations using phreeqc or a
##'     surrogate, equilibrium simulations
##' @param setup a list with several input parameters
##' @param init optional, initial state of the simulation
##' @param maxtime depending on \code{step}, the simulation time
##' @param step one of c("time","iter","fix_dt"), defines the time
##'     stepping
##' @param db the chemical database
##' @param procs number of processors to be used in parallel execution
##' @param maxsim number of simulations above which parallelization is
##'     triggered
##' @param reduce logical, defaults to TRUE. Should we find the unique
##'     occurences of a geochemical problem?
##' @param ebreak logical, defaults to FALSE. If TRUE, an early break
##'     is invoked
208
209
##' @param writeout logical, defaults to FALSE. Should we write the
##'     phreeqc output on disk?
210
211
212
##' @param surrogate logical, defaults to FALSE. Should we use
##'     surrogates instead of phreeqc?
##' @param surrogate.FUN The function which calls the surrogate
213
214
##' @param model a list containing all surrogate models, one element
##'     per variable
215
##' @param baleq data structure containing the balance equations
216
217
##' @param tol absolute tolerance on the mass balance. If this is
##'     trespassed, phreeqc is called instead
218
219
##' @param call_pqc logical, defaults to TRUE: if TRUE, calls phreeqc
##'     on the surrogate simulations which trespass the tolerance
220
221
222
223
##' @return a list containing lots of stuff. Each element of the list
##'     represent an iteration and has two elements: $T
##'     (concentrations after the transport) and $C, concentrations
##'     after the chemistry
224
##' @author MDL
225
##' @export
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
ReactTranspBalanceEq <- function(setup, init, maxtime, step=c("time","iter","fix_dt"),
                                 db="phreeqc.dat", procs=1, maxsim=150,
                                 reduce=TRUE, ebreak=FALSE, writeout=FALSE, surrogate=FALSE,
                                 surrogate.FUN, model, baleq, tol=1E-9, call_pqc=TRUE)
    
{
    if (surrogate) {
        if (missing(surrogate.FUN))
            stopmsg("Need a valid function to apply surrogate instead of chemistry")
        if (is.function(surrogate.FUN))
            Selected.Surrogate <- surrogate.FUN
    }
    
    step <- match.arg(step)
    ## prototype of the PHREEQC simulation
    base     <- setup$base
    ## SELECTED_OUTPUT and USER_PUNCH sections of the PHREEQC input data
    first    <- setup$first
    ## boundary conditions
    bound    <- setup$bound
    ## prop are the names of the properties for PHREEQC
    prop     <- setup$prop
    immobile <- setup$immobile
    initsim  <- setup$initsim

251
    if (procs > 1) {
252
        if (Sys.info()[["sysname"]]=="Windows") {
253
            ThisRunCluster <- parallel::makePSOCKcluster(procs)
254
        } else {
255
            ThisRunCluster <- parallel::makeForkCluster(procs)
256
257
258
        }

        doParallel::registerDoParallel(ThisRunCluster)
259
260
261
        msg("Registered default doParallel cluster with ", procs, "nodes")
    }

262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
    n  <- setup$n
    L  <- setup$len  # m
    U  <- setup$U    # m3/s
    dx <- L/n        # m
    dt <- dx/U * setup$Cf
    msg("Pure advection 1D simulations")

    ## Find out the number of iterations we're about to calculate
    if (step=="fix_dt"){
        dt <- setup$dt
        maxiter <- floor(maxtime/dt)+1
        msg("Fixed time step of ", dt, ". ", maxiter, " iterations required to reach enditme of", maxtime)
    }
    
    if (step=="time") {
        maxiter <- floor(maxtime/dt)+1
        msg("Courant number=", U*dt/dx, "; dt=",dt)
        msg(maxiter,"iterations required") 
    }
    
    if (step=="iter") {
        maxiter <- maxtime
        msg("Courant number=", U*dt/dx, "; dt=",dt,";")
        msg("Simulation will end at approx ",floor(maxiter*dt),"secs") 
    }
    
    ## MDL: add "complete" list. This will be a list of lists. Each
    ## iteration is a 3-components sublist storing state_T (the input
    ## to chemistry, after transport step), state_C (the input to
    ## transport, after chemistry), and the whole PHREEQC output.
    saved_complete <- vector(mode="list",length=maxiter+1)
    pad <- floor(log10(maxiter+1))+1 ## to properly format the step names
    spr_string <- paste0("%0", pad,"d")
    names(saved_complete) <- paste0("step_",sprintf(spr_string,seq(0,maxiter)))
296
297
298
299

    out_inp <- vector(mode="list",length=length(maxiter))
    out_res <- vector(mode="list",length=length(maxiter))
    out_bal <- vector(mode="list",length=length(maxiter))
300
301
302
303
304
305

    timing <- matrix(NA,ncol=5,nrow=maxiter)

    msg("Loading phreeqc db... ")
    phreeqc::phrLoadDatabaseString(db)
    msg("database loaded.")
306
307
308
309

    if (procs > 1) {
        parallel::clusterCall(cl=ThisRunCluster, phreeqc::phrLoadDatabaseString, db)
        msg("Database loaded on each worker.")
310
311
312
313
        if (surrogate) {
            parallel::clusterExport(cl=ThisRunCluster, varlist=c("model","surrogate.FUN"), envir = environment())
            msg("All workers are setup with the surrogate model.")
        }
314
315
    }

316
317
318
319
320
321
322
323
324
325
326
327
    if (missing(init)) {
        msg("missing initial chemical state")
        if (!is.character(initsim))
            runinitsim <- c(first,base)
        else
            runinitsim <- initsim
        
        msg("running initsim PHREEQC script assuming EQUILIBRIUM and correct PUNCH specifications...")
        tmpfirstrun <- RunPQC(runinitsim)
        msg("assuming homogeneous medium")
        msg("Names gathered after first run:", paste(colnames(tmpfirstrun), collapse=", "))
        state_C <- matrix(rep(tmpfirstrun,n), byrow=TRUE, ncol=length(prop))
328
        colnames(state_C) <- colnames(tmpfirstrun)
329
    } else {
330
        msg("given initial state; assuming correct colnames")
331
332
333
334
335
336
337
338
339
340
341
        state_C <- init
    }
    
    attr(state_C,"immobile") <- immobile

    index_saved <- 1
    saved_complete[[index_saved]] <- list(C=Act2pH(state_C))
    index_saved <- index_saved + 1
    
    ## transform the pH/pe back into activities for Advection
    msg("first step, Advection()...")
342
    state_T <- AdvectionPQC(pH2Act(state_C), inflow=bound, dx=dx, dt=dt, U=U)
343
344
345
346
347
    msg("Advection done...")
    Tr <- Ts <- 0
    
    ## treat special H+/pH, e-/pe cases
    state_T <- Act2pH(state_T)
Marco De Lucia's avatar
Marco De Lucia committed
348
    
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
    if (reduce)
        Tr <- system.time( reduced <- ReduceState(state_T) )[3]
    else
        reduced <- state_T
    
    msg("first step, chemistry ...")
    Ts <- system.time(inplist <- splitMultiFix(data=reduced, procs=procs, base=base, first=first,
                                               prop=prop, minerals=immobile, nmax=maxsim))[3]
    
    Tm <- system.time(res_C <- RunPQC(inplist, procs=procs))[3]
    attr(res_C,"immobile") <- immobile
    
    msg("iteration 0, CPU-time = ",round(Tm,3),"[s]")

    ## recompose after the reduction
    if (reduce)
        Tr <- Tr + system.time(state_C <- RecomposeState(Act2pH(res_C), reduced))[3]
    else {
        state_C <- res_C
        immobile_col_names <- setup$prop[setup$immobile]
        immobile_inds <- match( immobile_col_names, colnames(state_C) )
        attr(state_C,"immobile") <- immobile_inds
    }
    
    saved_complete[[index_saved]] <- list(T=Act2pH(state_T), C=Act2pH(state_C))
    index_saved <- index_saved + 1
375
376
    out_inp[[1]] <- inplist
    out_res[[1]] <- res_C
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
    
    if (ebreak) {
        msg("Early break invoked, bye.")
        return(saved_complete[[1]])
    }
    time <- dt
    iter <- 1

    timing[iter,] <- c(dim(reduced)[1],procs,Tm, Tr, Ts)

    msg("going through iterations now!")
    
    on.exit({
        msg("MAIN LOOP interrupted during iteration",iter,"!!")
        msg("returning last calculated chemistry.")
        print(traceback())
        msg(" Bye.")
        attr(state_C,"timing") <- timing
        saved_complete$currentT <- Act2pH(state_T)
        saved_complete$current <- Act2pH(state_C)
        return(saved_complete)
    })
    
    ## start of the iterations
    while (iter<maxiter) {
        ## iterations
        # TODO: It seems to me that iteration numberins is off by one, 
        # so that the first run on Rphree is actually one and the next ones end
        time <- time + dt
        iter <- iter + 1

        ## transform the pH/pe back into activities
409
        state_T <- AdvectionPQC(pH2Act(state_C), inflow=bound, dx=dx, dt=dt, U=U)
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427

        ## treat special H+/pH, e-/pe cases
        state_T <- Act2pH(state_T)

        ## reduction of the problem
        if(reduce)
            Tr <- system.time(reduced <- ReduceState(state_T))[3]
        else
            reduced <- state_T

        ## this is the place where we switch the simulator to the surrogate model
        ## for final test for the geochemical surrogate model!
        
        if (surrogate) { ## use a surrogate model created by the Surrogate playground script
            Ts <- 0.0        ## no split and distribute for this guy

            Tm <- system.time( tmpsur <- Selected.Surrogate(reduced, model=model) )[3]
            bal <- CheckBalance(baleq,reduced,tmpsur)
428
            out_bal[[iter]] <- bal
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
            totbal <- apply(bal, 1, mae)

            nonok <- which(totbal>tol) ### max(tol,tol2))
            if (length(nonok)>0){
                msg("found", length(nonok), "/", nrow(reduced), " offending simulations",
                    ifelse(call_pqc, ", calling PHREEQC"," "))
                if (call_pqc) {
                    reducednonok <- reduced[nonok, ]
                    Ts <- Ts + system.time(inplist <- splitMultiFix(data=reducednonok, procs=procs, base=base, first=first,
                                                                    prop=prop, minerals=immobile, nmax=maxsim))[3]
                    
                    out_inp[[iter]] <- inplist
                    if (length(nonok)==1) {
                        Tm <- Tm + system.time(pqcres_C <- RunPQC(inplist, procs=1))[3]
                    } else {
                        Tm <- Tm + system.time(pqcres_C <- RunPQC(input=inplist, procs=procs))[3]
                    }
                    ##mat <- apply(state, 2, as.numeric)

                    tmpsur[nonok, ] <- pqcres_C
                }
            }
            res_C <- tmpsur
            attr(res_C,"immobile") <- immobile
        } else { ## no surrogate, the original implementation
            ## new input
            Ts <- system.time(inplist <- splitMultiFix(data=reduced, procs=procs, base=base, first=first,
                                                       prop=prop, minerals=immobile, nmax=maxsim))[3]
457
            out_inp[[iter]] <- inplist
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
            Tm <- system.time(res_C <- RunPQC(inplist, procs=procs))[3]
            attr(res_C,"immobile") <- immobile
        }
        
        ## recompose after the reduction
        if (reduce)
            Tr <- Tr + system.time(state_C <- RecomposeState(res_C, reduced))[3]
        else {
            state_C <- res_C
            immobile_col_names <- setup$prop[setup$immobile]
            immobile_inds <- match( immobile_col_names, colnames(state_C) )
            attr(state_C,"immobile") <- immobile_inds
        }
        
        ## ## transform the pH/pe back into activities
        ## state_C <- pH2Act(state_C)

475
        out_res[[iter]] <- res_C
476
477
478
479
480
481
482
483
484

        timing[iter,] <- c( dim(reduced)[1], procs, Tm, Tr, Ts)
        msg("done iteration", iter, "/", maxiter," CPU-time ",round(Tm,3),"[s]")

        saved_complete[[index_saved]] <- list(T=Act2pH(state_T), C=Act2pH(state_C))
        index_saved <- index_saved + 1
        
    }
    on.exit()
485

486
    if (procs > 1) {
487
        parallel::stopCluster(ThisRunCluster)
488
489
490
        msg("Parallelization cluster stopped")
    }

491
492
493
494
    msg("assign out_balance, out_input and out_results to global environment for possible later use")
    assign("out_balance", out_bal, envir = .GlobalEnv)
    assign("out_results", out_res, envir = .GlobalEnv)
    assign("out_input",   out_inp, envir = .GlobalEnv)
495

496
497
498
499
500
501
    msg("total time of chemistry: ",round(sum(timing[,3]),3),"seconds")
    msg("total number of simulations: ",sum(timing[,1]))
    msg(" Bye.")
    
    attr(saved_complete,"timing") <- timing
    return(saved_complete)
Marco De Lucia's avatar
Marco De Lucia committed
502
503
}

504

Marco De Lucia's avatar
Marco De Lucia committed
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
##' Please look at the provided example
##' @title 1D Reactive transport simulations using phreeqc or a
##'     surrogate
##' @param setup a list with several input parameters
##' @param init optional, initial state of the simulation
##' @param maxtime depending on \code{step}, the simulation time
##' @param step one of c("time","iter","fix_dt"), defines the time
##'     stepping
##' @param db the chemical database
##' @param procs number of processors to be used in parallel execution
##' @param maxsim number of simulations above which parallelization is
##'     triggered
##' @param reduce logical, defaults to TRUE. Should we find the unique
##'     occurences of a geochemical problem?
##' @param ebreak logical, defaults to FALSE. If TRUE, an early break
##'     is invoked
##' @param writeout logical, defaults to FALSE.
##' @param surrogate logical, defaults to FALSE. Should we use
##'     surrogates instead of phreeqc?
##' @param surrogate.FUN The function which calls the surrogate
##' @param model The list of surrogate models, one element per
##'     variable
##' @param baleq data structure containing the balance equations
##' @param tol absolute tolerance on the mass balance
##' @param call_pqc logical, defaults to TRUE: if TRUE, calls phreeqc
##'     on the surrogate simulations which trespass the tolerance
##' @return a list containing lots of stuff
##' @author MDL
##' @export 
ReactTranspBalanceKin <- function(setup, init, maxtime, step=c("time","iter","fix_dt"),
                                  db="phreeqc.dat", procs=1, maxsim=150,
                                  reduce=TRUE, ebreak=FALSE, writeout=FALSE, surrogate=FALSE,
                                  surrogate.FUN, model, baleq, tol=1E-9, call_pqc=TRUE)
    
{

    if (surrogate) {
        if (missing(surrogate.FUN))
543
            stopmsg("Need a valid function to apply surrogate instead of chemistry")
Marco De Lucia's avatar
Marco De Lucia committed
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
        if (is.function(surrogate.FUN))
            Selected.Surrogate <- surrogate.FUN
    }
    step <- match.arg(step)
    ## prototype of the PHREEQC simulation
    base     <- setup$base
    ## SELECTED_OUTPUT and USER_PUNCH sections of the PHREEQC input data
    first    <- setup$first
    ## boundary conditions
    bound    <- setup$bound
    ## prop are the names of the properties for PHREEQC
    prop     <- setup$prop
    immobile <- setup$immobile
    kin      <- setup$kin
    initsim  <- setup$initsim
    ann      <- setup$ann
560

561

562
    if (procs > 1) {
563
        if (Sys.info()[["sysname"]]=="Windows") {
564
            ThisRunCluster <- parallel::makePSOCKcluster(procs)
565
        } else {
566
            ThisRunCluster <- parallel::makeForkCluster(procs)
567
568
569
        }

        doParallel::registerDoParallel(ThisRunCluster)
570
571
572
        msg("Registered default doParallel cluster with ", procs, "nodes")
    }

Marco De Lucia's avatar
Marco De Lucia committed
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
    n  <- setup$n
    L  <- setup$len  # m
    U  <- setup$U    # m3/s
    dx <- L/n        # m
    dt <- floor(dx/U * setup$Cf)
    msg("convective simulations")
    
    ## Find out the number of iterations we're about to calculate
    if (step=="fix_dt"){
        dt <- setup$dt
        maxiter <- floor(maxtime/dt)+1
        msg("Fixed time step of ", dt, ". ", maxiter, " iterations required to reach enditme of", maxtime)
    }
        
    if (step=="time") {
        maxiter <- floor(maxtime/dt)+1
        msg("Courant number=", U*dt/dx, "; dt=",dt)
        msg(maxiter,"iterations required") 
    }

    if (step=="iter") {
        maxiter <- maxtime
        msg("Courant number=", U*dt/dx, "; dt=",dt,";")
        msg("Simulation will end at approx ",floor(maxiter*dt),"secs") 
    }

    ## MDL: add "complete" list. This will be a list of lists. Each
    ## iteration is a 3-components sublist storing state_T (the input
    ## to chemistry, after transport step), state_C (the input to
    ## transport, after chemistry), and the whole PHREEQC output.
    saved_complete <- vector(mode="list",length=maxiter+1)
    pad <- floor(log10(maxiter+1))+1 ## to properly format the step names
    spr_string <- paste0("%0", pad,"d")
    names(saved_complete) <- paste0("step_",sprintf(spr_string,seq(0,maxiter)))
607
608
609
610

    out_inp <- vector(mode="list",length=length(maxiter))
    out_res <- vector(mode="list",length=length(maxiter))
    out_bal <- vector(mode="list",length=length(maxiter))
Marco De Lucia's avatar
Marco De Lucia committed
611
612
613
    
    timing <- matrix(NA,ncol=5,nrow=maxiter)
    msg("Loading phreeqc db... ")
614
    phreeqc::phrLoadDatabaseString(db)
Marco De Lucia's avatar
Marco De Lucia committed
615
    msg("database loaded.")
616
617
    if (procs > 1) {
        parallel::clusterCall(cl=ThisRunCluster, phreeqc::phrLoadDatabaseString, db)
618
619
620
621
        if (surrogate) {
            parallel::clusterExport(cl=ThisRunCluster, varlist=c("model","surrogate.FUN"), envir = environment())
            msg("All workers are setup with the surrogate model.")
        }
622
    }
Marco De Lucia's avatar
Marco De Lucia committed
623
624
    if (missing(init)) {
        msg("missing initial chemical state")
625
626
        if (is.null(initsim))
            initsim <- c(first,base)
Marco De Lucia's avatar
Marco De Lucia committed
627
628

        msg("running initsim PHREEQC script assuming EQUILIBRIUM and correct PUNCH specifications...")
629
        tmpfirstrun <- RunPQC(initsim)
Marco De Lucia's avatar
Marco De Lucia committed
630
        msg("assuming homogeneous medium")
631
        state_C <- matrix(rep(tmpfirstrun,n), byrow=TRUE, ncol=length(prop))
632
        colnames(state_C) <- colnames(tmpfirstrun)
Marco De Lucia's avatar
Marco De Lucia committed
633
    } else {
634
        msg("given initial state; assuming correct colnames")
Marco De Lucia's avatar
Marco De Lucia committed
635
636
        state_C <- init
    }
637

Marco De Lucia's avatar
Marco De Lucia committed
638
639
640
641
642
643
644
645
646
    attr(state_C,"immobile") <- immobile

    ## we save with pe/pH
    index_saved <- 1
    saved_complete[[index_saved]] <- list(C=Act2pH(state_C))
    index_saved <- index_saved + 1

    ## transform the pH/pe back into activities for Advection
    msg("first step, Advection()...")
647
    state_T <- AdvectionPQC(pH2Act(state_C), inflow=bound, dx=dx, dt=dt, U=U)
Marco De Lucia's avatar
Marco De Lucia committed
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
    Tr <- Ts <- 0

    ## treat special H+/pH, e-/pe cases
    state_T <- Act2pH(state_T)
  
    if (reduce)
        Tr <- system.time( reduced <- ReduceState(state_T) )[3]
    else
        reduced <- state_T
    
    msg("first step, chemistry ...")
    Ts <- system.time(inplist <- splitMultiKin(data=reduced, procs=procs, base=base, first=first, ann=ann, 
                                               prop=prop, minerals=immobile, kin=kin, dt=dt, nmax=maxsim))[3]
    
    Tm <- system.time(tmp <- RunPQC(inplist, procs=procs))[3]
    res_C <- tmp[, prop]
    attr(res_C,"immobile") <- immobile
    
    msg("iteration 0, CPU-time = ",round(Tm,3),"[s]")

    ## recompose after the reduction
    if (reduce)
        Tr <- Tr + system.time(state_C <- RecomposeState(res_C, reduced))[3]
    else {
        state_C <- res_C
        immobile_col_names <- setup$prop[setup$immobile]
        immobile_inds <- match( immobile_col_names, colnames(state_C) )
        attr(state_C,"immobile") <- immobile_inds
    }
    
    saved_complete[[index_saved]] <- list(T=Act2pH(state_T), C=Act2pH(state_C))
    index_saved <- index_saved + 1
680
681
    out_inp[[1]] <- inplist
    out_res[[1]] <- res_C
Marco De Lucia's avatar
Marco De Lucia committed
682
683
684
685
686
687
688
    
    if (ebreak) {
        msg("Early break invoked, bye.")
        return(saved_complete[[1]])
    }
    time <- dt
    iter <- 1
689

Marco De Lucia's avatar
Marco De Lucia committed
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
    timing[iter,] <- c(dim(reduced)[1],procs,Tm, Tr, Ts)
    
    msg("going through iterations now!")
    
    on.exit({
        msg("MAIN LOOP interrupted during iteration",iter,"!!")
        msg("returning last calculated chemistry.")
        print(traceback())
        msg(" Bye.")
        attr(state_C,"timing") <- timing
        saved_complete$currentT <- Act2pH(state_T)
        saved_complete$current <- Act2pH(state_C)
        return(saved_complete)
    })
    
    ## start of the iterations
    while (iter<maxiter) {
        ## iterations
        # TODO: It seems to me that iteration numberins is off by one, 
        # so that the first run on Rphree is actually one and the next ones end
        time <- time + dt
        iter <- iter + 1

        ## transform the pH/pe back into activities
714
        state_T <- AdvectionPQC(pH2Act(state_C), inflow=bound, dx=dx, dt=dt, U=U)
Marco De Lucia's avatar
Marco De Lucia committed
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733

        ## reduction of the problem
        if(reduce)
            Tr <- system.time(reduced <- ReduceState(state_T))[3]
        else
            reduced <- state_T

        ## transform the activity back into pH/pe
        reduced <- Act2pH(reduced)

        ## this is the place where we switch the simulator to the surrogate model
        ## for final test for the geochemical surrogate model!
        
        if (surrogate) { ## use a surrogate model created by the Surrogate playground script
            Ts <- 0.0        ## no split and distribute for this guy
            
            Tm <- system.time( tmpsur <- Selected.Surrogate(reduced, model=model) )[3]
            ## print(head(tmpsur))
            bal <- CheckBalance(baleq,reduced,tmpsur)
734
            out_bal[[iter]] <- bal
Marco De Lucia's avatar
Marco De Lucia committed
735
736
737
738
739
740
741
742
743
744
745
746
747
748
            totbal <- apply(bal, 1, mae)
            
            ## print(summary(totbal))
            ## tol2 <- unname(quantile(totbal, .85))
            
            nonok <- which(totbal>tol)
            if (length(nonok)>0){
                msg("found", length(nonok), "/", nrow(reduced), " offending simulations",
                    ifelse(call_pqc, ", calling PHREEQC"," "))
                if (call_pqc) {
                    reducednonok <- reduced[nonok, ]
                    Ts <- Ts + system.time(inplist <- splitMultiKin(data=reducednonok, procs=procs, base=base, first=first, ann=ann,
                                                                    prop=prop, minerals=immobile, kin=kin, dt=dt, nmax=maxsim))[3]
                    
749
                    out_inp[[iter]] <- inplist
Marco De Lucia's avatar
Marco De Lucia committed
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
                    if (length(nonok)==1) {
                        Tm <- Tm + system.time(tmp2 <- RunPQC(inplist, procs=1))[3]
                        pqcres_C <- tmp2[,prop]
                    } else {
                        Tm <- Tm + system.time(tmp2 <- RunPQC(input=inplist, procs=procs))[3]
                        pqcres_C <- tmp2[,prop]
                    }
                    
                    tmpsur[nonok, ] <- pqcres_C
                }
            } 
            res_C <- tmpsur
            attr(res_C,"immobile") <- immobile
        } else { ## no surrogate, the original implementation
            ## new input
            Ts <- system.time(inplist <- splitMultiKin(data=reduced, procs=procs, base=base, first=first, ann=ann,
                                                       prop=prop, minerals=immobile, kin=kin, dt=dt, nmax=maxsim))[3]
767
            out_inp[[iter]] <- inplist
Marco De Lucia's avatar
Marco De Lucia committed
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
            Tm <- system.time(tmp3 <- RunPQC(inplist, procs=procs))[3]
            res_C <- tmp3[,prop]
            attr(res_C,"immobile") <- immobile
        }
        
        ## recompose after the reduction
        if (reduce)
            Tr <- Tr + system.time(state_C <- RecomposeState(res_C, reduced))[3]
        else {
            state_C <- res_C
            immobile_col_names <- setup$prop[setup$immobile]
            immobile_inds <- match( immobile_col_names, colnames(state_C) )
            attr(state_C,"immobile") <- immobile_inds
        }

        ## transform the pH/pe back into activities
        state_C <- pH2Act(state_C)

786
        out_res[[iter]] <- res_C
Marco De Lucia's avatar
Marco De Lucia committed
787
788
789
790
791
792
793
794
795
     
        timing[iter,] <- c( dim(reduced)[1], procs, Tm, Tr, Ts)
        msg("done iteration", iter, "/", maxiter," CPU-time ",round(Tm,3),"[s]")
        
        saved_complete[[index_saved]] <- list(T=Act2pH(state_T), C=Act2pH(state_C))
        index_saved <- index_saved + 1
        
    }
    on.exit()
796
797

    if (procs > 1) {
798
        parallel::stopCluster(ThisRunCluster)
799
800
801
        msg("Parallelization cluster stopped")
    }

802
803
804
805
806
    msg("assign out_balance, out_input and out_results to global environment for possible later use")
    assign("out_balance", out_bal, envir = .GlobalEnv)
    assign("out_results", out_res, envir = .GlobalEnv)
    assign("out_input",   out_inp, envir = .GlobalEnv)

Marco De Lucia's avatar
Marco De Lucia committed
807
808
809
810
811
812
813
814
    msg("total time of chemistry: ",round(sum(timing[,3]),3),"seconds")
    msg("total number of simulations: ",sum(timing[,1]))
    msg(" Bye.")
    
    attr(saved_complete,"timing") <- timing
    #cat("PASS iteration saving ")
    return(saved_complete)
}