Skip to content
Snippets Groups Projects
Commit ba5dac9e authored by Adam Welc's avatar Adam Welc
Browse files

Fixed a problem with visibilty of functions re-defined in the parallel package.

parent 0efed73c
Branches
No related tags found
No related merge requests found
......@@ -414,7 +414,9 @@ final class REngine implements Engine, Engine.Timings {
}
private Object evalNode(RNode exprRep, REnvironment envir, int depth) {
RNode n = exprRep;
// we need to copy the node, otherwise it (and its children) will specialized to a specific
// frame descriptor and will fail on subsequent re-executions
RNode n = (RNode) exprRep.deepCopy();
RootCallTarget callTarget = doMakeCallTarget(n, EVAL_FUNCTION_NAME);
RCaller call = RArguments.getCall(envir.getFrame());
return evalTarget(callTarget, call, envir, depth);
......
......@@ -15,18 +15,22 @@
## Derived from snow and parallel packages
eval(expression(
closeNode.SHAREDnode <- function(node) {
fastr:::fastr.channel.close(node$channel)
}
}), asNamespace("parallel"))
eval(expression(
sendData.SHAREDnode <- function(node, data) {
fastr:::fastr.channel.send(node$channel, data)
}
}), asNamespace("parallel"))
eval(expression(
recvData.SHAREDnode <- function(node) {
fastr:::fastr.channel.receive(node$channel)
}
}), asNamespace("parallel"))
eval(expression(
recvOneData.SHAREDcluster <- function(cl) {
channel_ids = lapply(cl, function(l) l[["channel"]])
res <- fastr:::fastr.channel.select(channel_ids)
......@@ -35,8 +39,9 @@ recvOneData.SHAREDcluster <- function(cl) {
indexes = lapply(cl, function(l, id) if (identical(l[["channel"]], id)) id else as.integer(NA), id=selected_id)
node_ind = which(as.double(indexes)==as.double(selected_id))
list(node = node_ind, value = res[[2]])
}
}), asNamespace("parallel"))
eval(expression(
fastr.newSHAREDnode <- function(rank, options = defaultClusterOptions)
{
# Add the "debug" option defaulted to FALSE, if the user didn't specify
......@@ -63,10 +68,9 @@ fastr.newSHAREDnode <- function(rank, options = defaultClusterOptions)
channel <- fastr:::fastr.channel.create(port)
if (isTRUE(debug)) cat(sprintf("Context %d started!\n", rank))
structure(list(channel = channel, context=cx, rank = rank), class = "SHAREDnode")
}), asNamespace("parallel"))
}
eval(expression(
makeForkClusterExpr <- expression({
makeForkCluster <- function(nnodes = getOption("mc.cores", 2L), options = defaultClusterOptions, ...)
{
nnodes <- as.integer(nnodes)
......@@ -77,11 +81,15 @@ makeForkCluster <- function(nnodes = getOption("mc.cores", 2L), options = defaul
for (i in seq_along(cl)) cl[[i]] <- fastr.newSHAREDnode(rank=i, options=options)
class(cl) <- c("SHAREDcluster", "cluster")
cl
}), asNamespace("parallel"))
}; environment(makeForkCluster)<-asNamespace("parallel")})
eval(makeForkClusterExpr, asNamespace("parallel"))
eval(makeForkClusterExpr, as.environment("package:parallel"))
eval(expression(
stopCluster.SHAREDcluster <- function(cl) {
for (n in cl) {
parallel:::postNode(n, "DONE")
fastr:::fastr.context.join(n$context)
}
}
}), asNamespace("parallel"))
......@@ -15,15 +15,16 @@
## Derived from snow and parallel packages
eval(expression(
mc.set.children.streams <- function(cl)
{
if (RNGkind()[1L] == "L'Ecuyer-CMRG") {
clusterExport(cl, "LEcuyer.seed", envir = RNGenv)
clusterCall(cl, mc.set.stream)
}
}
}), asNamespace("parallel"))
eval(expression(
mclapplyExpr <- expression({
mclapply <- function(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE,
mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L),
mc.cleanup = TRUE, mc.allow.recursive = TRUE)
......@@ -89,4 +90,6 @@ mclapply <- function(X, FUN, ..., mc.preschedule = TRUE, mc.set.seed = TRUE,
res[sindex[[i]]] <- job.res[seq((i-1)*len+1, i*len)]
}
res
}), asNamespace("parallel"))
}; environment(mclapply)<-asNamespace("parallel")})
eval(mclapplyExpr, asNamespace("parallel"))
eval(mclapplyExpr, as.environment("package:parallel"))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment