Commit 7410300f authored by Marco De Lucia's avatar Marco De Lucia
Browse files

migration to doParallel in ReactTrans and Kinetics

parent a68631f9
## Functions for dealing with simulations with kinetics
### Marco De Lucia, delucia@gfz-potsdam.de, 2009-2018
### Time-stamp: "Last modified 2018-05-16 16:45:40 delucia"
### Time-stamp: "Last modified 2018-05-17 15:52:34 delucia"
##' This function just runs the generated input buffer - or a list
##' thereof - through \code{phreeqc}. Obviously it requires the
......@@ -50,12 +50,13 @@ RunPQC <- function(input, procs=1) {
## old one
## res <- parallel::mclapply(input, .runPQC, mc.silent=TRUE, mc.cores=procs)
res <- parallel::parLapply(input, .runPQC)
## res <- parallel::parLapply(ThisRunCluster, input, .runPQC)
res <- foreach(i=seq_along(input), .combine=rbind) %dopar% .runPQC(input[[i]])
## a is the string containing the rbind of each element of the list
a <- paste("rbind(",paste("res[[",1:procs,"]]",sep="", collapse = ", "),")")
## let evaluate this expression
res <- eval(parse(text=a))
## ## a is the string containing the rbind of each element of the list
## a <- paste("rbind(",paste("res[[",1:procs,"]]",sep="", collapse = ", "),")")
## ## let evaluate this expression
## res <- eval(parse(text=a))
}
return(res)
}
......
## Functions for dealing with surrogate simulations
### Marco De Lucia, delucia@gfz-potsdam.de, 2009-2018
### Time-stamp: "Last modified 2018-05-16 17:58:01 delucia"
### Time-stamp: "Last modified 2018-05-17 16:11:49 delucia"
##' Todo
......@@ -244,7 +244,13 @@ ReactTranspBalanceEq <- function(setup, init, maxtime, step=c("time","iter","fix
initsim <- setup$initsim
if (procs > 1) {
doParallel::registerDoParallel(procs)
if (Sys.info()[["sysname"]]=="Windows") {
ThisRunCluster <<- parallel::makePSOCKcluster(procs)
} else {
ThisRunCluster <<- parallel::makeForkCluster(procs)
}
doParallel::registerDoParallel(ThisRunCluster)
msg("Registered default doParallel cluster with ", procs, "nodes")
}
......@@ -293,7 +299,12 @@ ReactTranspBalanceEq <- function(setup, init, maxtime, step=c("time","iter","fix
msg("Loading phreeqc db... ")
phreeqc::phrLoadDatabaseString(db)
msg("database loaded.")
if (procs > 1) {
parallel::clusterCall(cl=ThisRunCluster, phreeqc::phrLoadDatabaseString, db)
msg("Database loaded on each worker.")
}
if (missing(init)) {
msg("missing initial chemical state")
if (!is.character(initsim))
......@@ -466,6 +477,12 @@ ReactTranspBalanceEq <- function(setup, init, maxtime, step=c("time","iter","fix
}
on.exit()
if (procs > 1) {
stopCluster(ThisRunCluster)
msg("Parallelization cluster stopped")
}
msg("total time of chemistry: ",round(sum(timing[,3]),3),"seconds")
msg("total number of simulations: ",sum(timing[,1]))
msg(" Bye.")
......@@ -531,8 +548,15 @@ ReactTranspBalanceKin <- function(setup, init, maxtime, step=c("time","iter","fi
initsim <- setup$initsim
ann <- setup$ann
if (procs > 1) {
doParallel::registerDoParallel(procs)
if (Sys.info()[["sysname"]]=="Windows") {
ThisRunCluster <<- parallel::makePSOCKcluster(procs)
} else {
ThisRunCluster <<- parallel::makeForkCluster(procs)
}
doParallel::registerDoParallel(ThisRunCluster)
msg("Registered default doParallel cluster with ", procs, "nodes")
}
......@@ -579,6 +603,9 @@ ReactTranspBalanceKin <- function(setup, init, maxtime, step=c("time","iter","fi
msg("Loading phreeqc db... ")
phreeqc::phrLoadDatabaseString(db)
msg("database loaded.")
if (procs > 1) {
parallel::clusterCall(cl=ThisRunCluster, phreeqc::phrLoadDatabaseString, db)
}
if (missing(init)) {
msg("missing initial chemical state")
if (is.null(initsim))
......@@ -755,6 +782,12 @@ ReactTranspBalanceKin <- function(setup, init, maxtime, step=c("time","iter","fi
}
on.exit()
if (procs > 1) {
stopCluster(ThisRunCluster)
msg("Parallelization cluster stopped")
}
msg("total time of chemistry: ",round(sum(timing[,3]),3),"seconds")
msg("total number of simulations: ",sum(timing[,1]))
msg(" Bye.")
......
......@@ -87,9 +87,13 @@ fitControl <- trainControl(method = "none",
predictionBounds=rep(TRUE,2))
modk <- parallel::mclapply(colnames(sam$result), function (x)
return(train(x=sam$design, y=sam$result[,x],trControl = fitControl, ##tuneGrid = tg,
method="parRF")), mc.cores=8)
doParallel::registerDoParallel(4)
## modk <- parallel::mclapply(colnames(sam$result), function (x)
## return(train(x=sam$design, y=sam$result[,x],trControl = fitControl, ##tuneGrid = tg,
## method="parRF")), mc.cores=8)
modk <- foreach(i=colnames(sam$result)) %dopar% caret::train(x=sam$design, y=sam$result[,i], trControl = fitControl, method="parRF")
names(modk) <- colnames(sam$result)
saveRDS(modk, "models_kinetics_RF.rds")
......@@ -117,7 +121,7 @@ MySurrKin <- function(state, model){
order_for_surrogate <- c("C","Ca","Cl","Mg","pH","pe","Calcite","Dolomite")
rem_attr <- attr(state,"immobile")
ss <- des[, order_for_surrogate]
pred <- lapply(names(model), function(x) as.numeric(predict.train(model[[x]], ss)))
pred <- parLapply(ThisRunCluster, names(model), function(x) as.numeric(predict.train(model[[x]], ss)))
out <- cbind(as.data.frame(pred), state[,excl])
colnames(out) <- c(names(model),"O2g")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment