diff --git a/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/fastr/FastRContext.java b/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/fastr/FastRContext.java index 3594b6ceaaeb548248103ca47531d29805e995b6..ab4b29c69b4a68cb276a54dde695863766b9e58c 100644 --- a/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/fastr/FastRContext.java +++ b/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/fastr/FastRContext.java @@ -96,7 +96,7 @@ public class FastRContext { } private static void handleSharedContexts(ContextKind contextKind) { - if (contextKind == ContextKind.SHARE_ALL) { + if (contextKind == ContextKind.SHARE_ALL && RContext.EvalThread.threadCnt.get() == 0) { RContext current = RContext.getInstance(); if (EvalThread.threadCnt.get() == 0 && (current.isInitial() || current.getKind() == ContextKind.SHARE_PARENT_RW)) { ContextInfo.resetMultiSlotIndexGenerator(); @@ -137,13 +137,15 @@ public class FastRContext { int length = exprs.getLength(); RContext.EvalThread[] threads = new RContext.EvalThread[length]; int[] data = new int[length]; + int[] multiSlotIndices = new int[length]; for (int i = 0; i < length; i++) { ContextInfo info = createContextInfo(contextKind); threads[i] = new RContext.EvalThread(info, RSource.fromTextInternalInvisible(exprs.getDataAt(i % exprs.getLength()), RSource.Internal.CONTEXT_EVAL)); data[i] = info.getId(); + multiSlotIndices[i] = info.getMultiSlotInd(); } if (contextKind == ContextKind.SHARE_ALL) { - REnvironment.convertSearchpathToMultiSlot(); + REnvironment.convertSearchpathToMultiSlot(multiSlotIndices); } for (int i = 0; i < length; i++) { threads[i].start(); @@ -167,8 +169,13 @@ public class FastRContext { @TruffleBoundary protected RNull eval(RAbstractIntVector handle) { try { + int[] multiSlotIndices = new int[handle.getLength()]; for (int i = 0; i < handle.getLength(); i++) { - Thread thread = RContext.EvalThread.threads.get(handle.getDataAt(i)); + int id = handle.getDataAt(i); + Thread thread = RContext.EvalThread.threads.get(id); + if (RContext.EvalThread.idToMultiSlotTable.containsKey(id)) { + multiSlotIndices[i] = RContext.EvalThread.idToMultiSlotTable.remove(id); + } if (thread == null) { // already done continue; @@ -176,6 +183,12 @@ public class FastRContext { thread.join(); } } + // If all eval threads died, completely remove multi slot data. + if (RContext.EvalThread.threadCnt.get() == 0) { + REnvironment.cleanupSearchpathFromMultiSlot(); + } else { + REnvironment.cleanupSearchpathFromMultiSlot(multiSlotIndices); + } } catch (InterruptedException ex) { throw error(RError.Message.GENERIC, "error finishing eval thread"); @@ -230,12 +243,14 @@ public class FastRContext { } else { // separate threads that run in parallel; invoking thread waits for completion RContext.EvalThread[] threads = new RContext.EvalThread[length]; + int[] multiSlotIndices = new int[length]; for (int i = 0; i < length; i++) { ContextInfo info = createContextInfo(contextKind); threads[i] = new RContext.EvalThread(info, RSource.fromTextInternalInvisible(exprs.getDataAt(i % exprs.getLength()), RSource.Internal.CONTEXT_EVAL)); + multiSlotIndices[i] = info.getMultiSlotInd(); } if (contextKind == ContextKind.SHARE_ALL) { - REnvironment.convertSearchpathToMultiSlot(); + REnvironment.convertSearchpathToMultiSlot(multiSlotIndices); } for (int i = 0; i < length; i++) { threads[i].start(); diff --git a/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/parallel/R/forkcluster_overrides.R b/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/parallel/R/forkcluster_overrides.R index 02940b6a2aa6d27bf3fc7b34d79558e58f26a8c3..ef6706e12aabf46ad4405a29bbee45cc2b61f442 100644 --- a/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/parallel/R/forkcluster_overrides.R +++ b/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/parallel/R/forkcluster_overrides.R @@ -55,9 +55,23 @@ newSHAREDnodes <- function(nnodes, debug, options = defaultClusterOptions) { contexts <- vector("integer", nnodes) channels <- vector("integer", nnodes) outfile <- getClusterOption("outfile", options) + + # get initial port number + port <- as.integer(parallel:::getClusterOption("port", options)) + + # find first unused port number + continue <- TRUE + firstUnused <- 0 + while(continue) { + tryCatch({ + .fastr.channel.get(port + (firstUnused + 1) * 1000) + firstUnused <- firstUnused + 1 + }, error = function(e) { continue <<- FALSE }) + } + for (i in 1:nnodes) { # generate unique values for channel keys (addition factor is chosen based on how snow generates port numbers) - port <- as.integer(parallel:::getClusterOption("port", options) + i * 1000) + port <- as.integer(parallel:::getClusterOption("port", options) + (i + firstUnused) * 1000) startup <- substitute(local({ makeSHAREDmaster <- function(key) { diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/context/RContext.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/context/RContext.java index affd6bef10ef5dc829e42e72a9bca1b7211eb572..c33386c9c3962596f8b75daa7693f508a280e77d 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/context/RContext.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/context/RContext.java @@ -230,6 +230,9 @@ public final class RContext extends com.oracle.truffle.api.ExecutionContext impl public static final Map<Integer, Thread> threads = new ConcurrentHashMap<>(); + /** This table is required to create several bunches of child contexts. */ + public static final Map<Integer, Integer> idToMultiSlotTable = new ConcurrentHashMap<>(); + /** We use a separate counter for threads since ConcurrentHashMap.size() is not reliable. */ public static final AtomicInteger threadCnt = new AtomicInteger(0); @@ -239,6 +242,7 @@ public final class RContext extends com.oracle.truffle.api.ExecutionContext impl this.source = source; threadCnt.incrementAndGet(); threads.put(info.getId(), this); + idToMultiSlotTable.put(info.getId(), info.getMultiSlotInd()); } @Override @@ -333,6 +337,10 @@ public final class RContext extends com.oracle.truffle.api.ExecutionContext impl public RList getEvalResult() { return evalResult; } + + public ContextInfo getContextInfo() { + return info; + } } private final ContextInfo info; diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/env/REnvironment.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/env/REnvironment.java index b8ba2fe1c75518f24ba5f714d69ba7077ecbbd99..4cfd1edc38959a723768a34df848d46e987c584f 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/env/REnvironment.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/env/REnvironment.java @@ -910,32 +910,70 @@ public abstract class REnvironment extends RAttributeStorage { return getPrintName(); } - public static void convertSearchpathToMultiSlot() { + public static <T> void applyToSearchPath(SearchPathFun fun) { CompilerAsserts.neverPartOfCompilation(); RContext.markNonSingle(); ContextStateImpl parentState = RContext.getInstance().stateREnvironment; SearchPath searchPath = parentState.getSearchPath(); assert searchPath.size() > 0 && searchPath.get(0).getSearchName() == Global.SEARCHNAME; // for global space don't replicate entries as all contexts should see their own values - FrameSlotChangeMonitor.handleAllMultiSlots(searchPath.get(0).getFrame(), false); + fun.apply(searchPath.get(0).getFrame(), false); for (int i = 1; i < searchPath.size(); i++) { - FrameSlotChangeMonitor.handleAllMultiSlots(searchPath.get(i).getFrame(), true); + fun.apply(searchPath.get(i).getFrame(), true); } REnvironment namespaces = parentState.namespaceRegistry; Frame namespacesFrame = namespaces.getFrame(); + fun.apply(namespacesFrame, true); // make a copy avoid potential updates to the array iterated over FrameSlot[] slots = new FrameSlot[namespacesFrame.getFrameDescriptor().getSlots().size()]; slots = namespacesFrame.getFrameDescriptor().getSlots().toArray(slots); for (int i = 0; i < slots.length; i++) { - REnvironment namespaceEnv = (REnvironment) namespacesFrame.getValue(slots[i]); + REnvironment namespaceEnv = (REnvironment) FrameSlotChangeMonitor.getValue(slots[i], namespacesFrame); if (namespaceEnv != Base.baseNamespaceEnv()) { // base namespace frame redirects all accesses to base frame and this would // result in processing the slots twice - FrameSlotChangeMonitor.handleAllMultiSlots(namespaceEnv.getFrame(), true); + fun.apply(namespaceEnv.getFrame(), true); } } } + private interface SearchPathFun { + void apply(Frame frame, boolean replicate); + } + + public static void convertSearchpathToMultiSlot(int[] indices) { + applyToSearchPath(new SearchPathFun() { + + @Override + public void apply(Frame frame, boolean replicate) { + FrameSlotChangeMonitor.handleAllMultiSlots(frame, indices, replicate); + } + + }); + } + + public static void cleanupSearchpathFromMultiSlot() { + applyToSearchPath(new SearchPathFun() { + + @Override + public void apply(Frame frame, boolean replicate) { + FrameSlotChangeMonitor.cleanMultiSlots(frame, null); + } + + }); + } + + public static void cleanupSearchpathFromMultiSlot(int[] multiSlotIndices) { + applyToSearchPath(new SearchPathFun() { + + @Override + public void apply(Frame frame, boolean replicate) { + FrameSlotChangeMonitor.cleanMultiSlots(frame, multiSlotIndices); + } + + }); + } + private static final class BaseNamespace extends REnvironment { private BaseNamespace(String name, MaterializedFrame frame) { super(name, frame); diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/env/frame/FrameSlotChangeMonitor.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/env/frame/FrameSlotChangeMonitor.java index 8c7e60ef51c8b7702fac9021c1b41902a2c9d1fa..813377fc39614ef4387f9e30d1512d7dba1db096 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/env/frame/FrameSlotChangeMonitor.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/env/frame/FrameSlotChangeMonitor.java @@ -460,7 +460,15 @@ public final class FrameSlotChangeMonitor { public static final class MultiSlotData { - private final Object[] data = new Object[ContextInfo.contextNum()]; + private final Object[] data; + + public MultiSlotData(MultiSlotData prevValue) { + data = Arrays.copyOf(prevValue.data, ContextInfo.contextNum()); + } + + public MultiSlotData() { + data = new Object[ContextInfo.contextNum()]; + } public Object get(int ind) { return data[ind]; @@ -600,14 +608,13 @@ public final class FrameSlotChangeMonitor { } } - public static void handleSearchPathMultiSlot(Frame frame, FrameSlot slot, boolean replicate) { + public static void handleSearchPathMultiSlot(Frame frame, FrameSlot slot, int[] indices, boolean replicate) { CompilerAsserts.neverPartOfCompilation(); while (true) { FrameSlotInfoImpl info = (FrameSlotInfoImpl) slot.getInfo(); if (info.stableValue == null || !replicate) { // create a multi slot for slots whose stableValue is null but also for all - // slots of - // the global frame (which are marked as !replicate) + // slots of the global frame (which are marked as !replicate) info.stableValue = null; info.nonLocalModifiedAssumption.invalidate(); info.noMultiSlot.invalidate(); @@ -617,20 +624,28 @@ public final class FrameSlotChangeMonitor { // TODO: do we have to worry that prevValue can be invalid? if (prevValue instanceof MultiSlotData) { // this handles the case when we create share contexts for the second time - - // existing multi slots are an artifact of the previous executions and must - // be - // discarded - // TOOD: consider re-using multi slots but since we don't expect many of - // them, - // perhaps it's too much work for too little gain + // existing multi slots are an artifact of a previous executions and must + // be extended + // TODO: consider re-using multi slots but since we don't expect many of + // them, perhaps it's too much work for too little gain + data = new MultiSlotData((MultiSlotData) prevValue); prevValue = ((MultiSlotData) prevValue).get(0); - } else if (FastROptions.SearchPathForcePromises.getBooleanValue()) { - prevValue = RContext.getRRuntimeASTAccess().forcePromise("searchPathPromiseForce", prevValue); - } - if (replicate) { - data.setAll(prevValue); + + // replicate value only for newly created child contexts + if (replicate) { + for (int i : indices) { + data.set(i, prevValue); + } + } } else { - data.set(0, prevValue); + if (FastROptions.SearchPathForcePromises.getBooleanValue()) { + prevValue = RContext.getRRuntimeASTAccess().forcePromise("searchPathPromiseForce", prevValue); + } + if (replicate) { + data.setAll(prevValue); + } else { + data.set(0, prevValue); + } } frame.setObject(slot, data); break; @@ -920,13 +935,13 @@ public final class FrameSlotChangeMonitor { /* * This method should be called for frames of all environments on the search path. */ - public static void handleAllMultiSlots(Frame frame, boolean replicate) { + public static void handleAllMultiSlots(Frame frame, int[] indices, boolean replicate) { // make a copy avoid potential updates to the array iterated over FrameSlot[] slots = new FrameSlot[frame.getFrameDescriptor().getSlots().size()]; slots = frame.getFrameDescriptor().getSlots().toArray(slots); for (int i = 0; i < slots.length; i++) { if (!(slots[i].getIdentifier() instanceof RFrameSlot)) { - FrameSlotInfoImpl.handleSearchPathMultiSlot(frame, slots[i], replicate); + FrameSlotInfoImpl.handleSearchPathMultiSlot(frame, slots[i], indices, replicate); } } } @@ -974,4 +989,34 @@ public final class FrameSlotChangeMonitor { return frame.getValue(slot); } } + + /** + * Nullifies a set of slots in a {@link MultiSlotData} to avoid memory leaks. When providing + * {@code null} as indices, alls subslots except the first one are nullified. + */ + public static void cleanMultiSlots(Frame frame, int[] indices) { + CompilerAsserts.neverPartOfCompilation(); + // make a copy avoid potential updates to the array iterated over + FrameSlot[] slots = frame.getFrameDescriptor().getSlots().toArray(new FrameSlot[0]); + + for (int i = 0; i < slots.length; i++) { + if (!(slots[i].getIdentifier() instanceof RFrameSlot)) { + Object value = frame.getValue(slots[i]); + if (value instanceof MultiSlotData) { + MultiSlotData msd = (MultiSlotData) value; + if (indices != null) { + for (int j = 0; j < indices.length; j++) { + assert indices[j] != 0; + msd.set(indices[j], null); + } + } else { + // only safe value of primordial context + Object initialValue = msd.get(0); + msd.setAll(null); + msd.set(0, initialValue); + } + } + } + } + } } diff --git a/com.oracle.truffle.r.test/src/com/oracle/truffle/r/test/ExpectedTestOutput.test b/com.oracle.truffle.r.test/src/com/oracle/truffle/r/test/ExpectedTestOutput.test index 5c89154642282ef8112e8636bce9fda6e47c3a4a..b1726a12cfb2df32babfc62ba312c5052b0d7ebd 100644 --- a/com.oracle.truffle.r.test/src/com/oracle/truffle/r/test/ExpectedTestOutput.test +++ b/com.oracle.truffle.r.test/src/com/oracle/truffle/r/test/ExpectedTestOutput.test @@ -130547,6 +130547,10 @@ a b c d e #{ source("mxbuild/com.oracle.truffle.r.test/bin/com/oracle/truffle/r/test/channels/R/channels9.R") } [1] TRUE FALSE +##com.oracle.truffle.r.test.library.fastr.TestChannels.runRSourceTests# +#{ source("mxbuild/com.oracle.truffle.r.test/bin/com/oracle/truffle/r/test/channels/R/contexts1.R") } +[1] 7 42 + ##com.oracle.truffle.r.test.library.fastr.TestChannels.runRSourceTests# #{ source("mxbuild/com.oracle.truffle.r.test/bin/com/oracle/truffle/r/test/channels/R/sharing1.R") } [1] 42 7 diff --git a/com.oracle.truffle.r.test/src/com/oracle/truffle/r/test/channels/R/contexts1.R b/com.oracle.truffle.r.test/src/com/oracle/truffle/r/test/channels/R/contexts1.R new file mode 100644 index 0000000000000000000000000000000000000000..f8fe9ce47b22dda2bbd4f4cfc79c5ac6d78f1472 --- /dev/null +++ b/com.oracle.truffle.r.test/src/com/oracle/truffle/r/test/channels/R/contexts1.R @@ -0,0 +1,17 @@ +if (length(grep("FastR", R.Version()$version.string)) == 1) { + ch0 <- .fastr.channel.create(1L) + ch1 <- .fastr.channel.create(2L) + code0 <- "ch <- .fastr.channel.get(1L); .fastr.channel.send(ch, 7L)" + code1 <- "ch <- .fastr.channel.get(2L); .fastr.channel.send(ch, 42L)" + cx0 <- .fastr.context.spawn(code0) + cx1 <- .fastr.context.spawn(code1) + x<-.fastr.channel.receive(ch0) + y<-.fastr.channel.receive(ch1) + .fastr.context.join(cx0) + .fastr.context.join(cx1) + .fastr.channel.close(ch0) + .fastr.channel.close(ch1) + print(c(x,y)) +} else { + print(c(7L, 42L)) +}