diff --git a/com.oracle.truffle.r.library/src/com/oracle/truffle/r/library/fastr/FastRContext.java b/com.oracle.truffle.r.library/src/com/oracle/truffle/r/library/fastr/FastRContext.java index b5b3b0cf66c61195d61d16a118b4ad71273b0d06..83a6bc602be0e0d9f598fb7280b999b2a706ca63 100644 --- a/com.oracle.truffle.r.library/src/com/oracle/truffle/r/library/fastr/FastRContext.java +++ b/com.oracle.truffle.r.library/src/com/oracle/truffle/r/library/fastr/FastRContext.java @@ -80,6 +80,27 @@ public class FastRContext { } } + public abstract static class Join extends RExternalBuiltinNode.Arg1 { + @Specialization + protected RNull eval(RIntVector contexts) { + try { + for (int i = 0; i < contexts.getLength(); i++) { + RContext context = RContext.find(contexts.getDataAt(i)); + if (context == null) { + // already done + continue; + } else { + context.joinThread(); + } + } + } catch (InterruptedException ex) { + throw RError.error(this, RError.Message.GENERIC, "error finishing eval thread"); + + } + return RNull.instance; + } + } + public abstract static class Eval extends RExternalBuiltinNode.Arg3 { @Specialization protected RNull eval(RIntVector contexts, RAbstractStringVector exprs, byte par) { @@ -97,7 +118,7 @@ public class FastRContext { threads[i].join(); } } catch (InterruptedException ex) { - + throw RError.error(this, RError.Message.GENERIC, "error finishing eval thread"); } } else { for (int i = 0; i < contexts.getLength(); i++) { diff --git a/com.oracle.truffle.r.native/library/fastr/src/NAMESPACE b/com.oracle.truffle.r.native/library/fastr/src/NAMESPACE index 279c0d1878a7940fa797829088074a72526280c0..ab0090b5af4eab4bbc16cd707eb132c49d9c865c 100644 --- a/com.oracle.truffle.r.native/library/fastr/src/NAMESPACE +++ b/com.oracle.truffle.r.native/library/fastr/src/NAMESPACE @@ -13,6 +13,7 @@ export(fastr.createpkgsources) export(fastr.createpkgsource) export(fastr.context.create) export(fastr.context.spawn) +export(fastr.context.join) export(fastr.context.eval) export(fastr.context.pareval) export(print.fastr_context) diff --git a/com.oracle.truffle.r.native/library/fastr/src/R/fastr.R b/com.oracle.truffle.r.native/library/fastr/src/R/fastr.R index bea405893fcf6e71964584f2296d6c5050843d04..d782c83b104bdb686dec7a1dffce65a3f161c9c7 100644 --- a/com.oracle.truffle.r.native/library/fastr/src/R/fastr.R +++ b/com.oracle.truffle.r.native/library/fastr/src/R/fastr.R @@ -90,6 +90,11 @@ fastr.context.spawn <- function(contexts, exprs) { invisible(NULL) } +fastr.context.join <- function(contexts) { + .FastR(.NAME="context.join", contexts) + invisible(NULL) +} + fastr.context.eval <- function(contexts, exprs, par=FALSE) { .FastR(.NAME="context.eval", contexts, exprs, par) invisible(NULL) diff --git a/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/base/foreign/FastR.java b/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/base/foreign/FastR.java index 02044b58461a11fd0dd4d55d14aff799f78b1e6e..c8f9b77579392aa2d219c2afc57a6cad0b905116 100644 --- a/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/base/foreign/FastR.java +++ b/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/base/foreign/FastR.java @@ -88,6 +88,8 @@ public abstract class FastR extends RBuiltinNode { return FastRContextFactory.PrintNodeGen.create(); case "context.spawn": return FastRContextFactory.SpawnNodeGen.create(); + case "context.join": + return FastRContextFactory.JoinNodeGen.create(); case "context.eval": return FastRContextFactory.EvalNodeGen.create(); case "fastr.channel.create": diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/RContext.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/RContext.java index 868a8822192fecca6f8ea0721e09f0239b1269eb..17182ac2d0d90b096ba074fa0a3c561a7e1ad4c7 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/RContext.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/RContext.java @@ -447,6 +447,7 @@ public final class RContext extends ExecutionContext { public EvalThread(RContext context, Source source) { super(context); this.source = source; + context.evalThread = this; } @Override @@ -508,6 +509,11 @@ public final class RContext extends ExecutionContext { */ private RContext sharedChild; + /** + * Back pointer to the evalThread. + */ + private EvalThread evalThread; + /** * Typically there is a 1-1 relationship between an {@link RContext} and the thread that is * performing the evaluation, so we can store the {@link RContext} in a {@link ThreadLocal}. @@ -534,6 +540,8 @@ public final class RContext extends ExecutionContext { private static final Deque<RContext> allContexts = new ConcurrentLinkedDeque<>(); + private static final Semaphore allContextsSemaphore = new Semaphore(1, true); + /** * A (hopefully) temporary workaround to ignore the setting of {@link #resultVisible} for * benchmarks. Set across all contexts. @@ -567,6 +575,20 @@ public final class RContext extends ExecutionContext { } } + /** + * Waits for the associated EvalThread to finish + * + * @throws InterruptedException + */ + public void joinThread() throws InterruptedException { + EvalThread t = this.evalThread; + if (t == null) { + throw RError.error(RError.NO_NODE, RError.Message.GENERIC, "no eval thread in a given context"); + } + this.evalThread = null; + t.join(); + } + private static final Assumption singleContextAssumption = Truffle.getRuntime().createAssumption("single RContext"); @CompilationFinal private static RContext singleContext; @@ -586,7 +608,13 @@ public final class RContext extends ExecutionContext { } this.consoleHandler = consoleHandler; this.interactive = consoleHandler.isInteractive(); - allContexts.add(this); + try { + allContextsSemaphore.acquire(); + allContexts.add(this); + allContextsSemaphore.release(); + } catch (InterruptedException x) { + throw RError.error(RError.NO_NODE, RError.Message.GENERIC, "error destroying context"); + } if (singleContextAssumption.isValid()) { if (singleContext == null) { @@ -699,7 +727,13 @@ public final class RContext extends ExecutionContext { parent.sharedChild = null; } engine = null; - allContexts.remove(this); + try { + allContextsSemaphore.acquire(); + allContexts.remove(this); + allContextsSemaphore.release(); + } catch (InterruptedException x) { + throw RError.error(RError.NO_NODE, RError.Message.GENERIC, "error destroying context"); + } if (parent == null) { threadLocalContext.set(null); } else { @@ -720,10 +754,17 @@ public final class RContext extends ExecutionContext { } public static RContext find(int id) { - for (RContext context : allContexts) { - if (context.id == id) { - return context; + try { + allContextsSemaphore.acquire(); + for (RContext context : allContexts) { + if (context.id == id) { + allContextsSemaphore.release(); + return context; + } } + allContextsSemaphore.release(); + } catch (InterruptedException x) { + throw RError.error(RError.NO_NODE, RError.Message.GENERIC, "error destroying context"); } return null; }