From 9f200e63f9ecedd3a1540e037f789f49d8a6bda9 Mon Sep 17 00:00:00 2001 From: Adam Welc <adam.welc@oracle.com> Date: Sun, 9 Aug 2015 19:15:53 -0700 Subject: [PATCH] Added (finally) the missing file. --- .../oracle/truffle/r/runtime/RChannel.java | 165 ++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/RChannel.java diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/RChannel.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/RChannel.java new file mode 100644 index 0000000000..49b668807d --- /dev/null +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/RChannel.java @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2015, 2015, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package com.oracle.truffle.r.runtime; + +import java.util.concurrent.*; + +import com.oracle.truffle.r.runtime.data.*; + +/** + * Implementation of a channel abstraction used for communication between parallel contexts in + * shared memory space. + */ +public class RChannel { + + private final static int INITIAL_CHANNEL_NUM = 4; + private final static int CHANNEL_NUM_GROW_FACTOR = 2; + private final static int QUEUE_CAPACITY = 1; + + private static int[] keys = new int[INITIAL_CHANNEL_NUM]; + private static RChannel[] channels = new RChannel[INITIAL_CHANNEL_NUM]; + + /* + * Used to mediate access to the semaphore instances + */ + private final static Semaphore create = new Semaphore(1, true); + + private final ArrayBlockingQueue<Object> masterToClient = new ArrayBlockingQueue<>(QUEUE_CAPACITY); + private final ArrayBlockingQueue<Object> clientToMaster = new ArrayBlockingQueue<>(QUEUE_CAPACITY); + + public static int createChannel(int key) { + if (key == 0) { + throw RError.error(RError.NO_NODE, RError.Message.GENERIC, "channel's key must be non-zero"); + } + try { + create.acquire(); + while (true) { + int freeSlot = -1; + // start from one as we need slots that have distinguishable positive and negative + // value + for (int i = 1; i < keys.length; i++) { + if (keys[i] == key) { + create.release(); + throw RError.error(RError.NO_NODE, RError.Message.GENERIC, "channel with specified key already exists"); + } + if (keys[i] == 0 && freeSlot == -1) { + freeSlot = i; + } + + } + if (freeSlot != -1) { + keys[freeSlot] = key; + channels[freeSlot] = new RChannel(); + create.release(); + return freeSlot; + } else { + int[] keysTmp = new int[keys.length * CHANNEL_NUM_GROW_FACTOR]; + RChannel[] channelsTmp = new RChannel[channels.length * CHANNEL_NUM_GROW_FACTOR]; + for (int i = 1; i < keys.length; i++) { + keysTmp[i] = keys[i]; + channelsTmp[i] = channels[i]; + } + keys = keysTmp; + channels = channelsTmp; + } + } + } catch (InterruptedException x) { + throw RError.error(RError.NO_NODE, RError.Message.GENERIC, "error creating a channel"); + } + } + + public static int getChannel(int key) { + try { + create.acquire(); + for (int i = 1; i < keys.length; i++) { + if (keys[i] == key) { + create.release(); + return -i; + } + } + create.release(); + } catch (InterruptedException x) { + throw RError.error(RError.NO_NODE, RError.Message.GENERIC, "error accessing channel"); + } + throw RError.error(RError.NO_NODE, RError.Message.GENERIC, "channel does not exist"); + } + + public static void closeChannel(int id) { + int actualId = Math.abs(id); + try { + create.acquire(); + if (actualId == 0 || actualId >= channels.length || channels[actualId] == null) { + throw RError.error(RError.NO_NODE, RError.Message.GENERIC, "channel with specified id does not exist"); + } + keys[actualId] = 0; + channels[actualId] = null; + create.release(); + } catch (InterruptedException x) { + throw RError.error(RError.NO_NODE, RError.Message.GENERIC, "error closing channel"); + } + } + + private static RChannel getChannelFromId(int id) { + int actualId = Math.abs(id); + try { + create.acquire(); + if (actualId == 0 || actualId >= channels.length || channels[actualId] == null) { + throw RError.error(RError.NO_NODE, RError.Message.GENERIC, "channel with specified id does not exist"); + } + RChannel channel = channels[actualId]; + create.release(); + return channel; + } catch (InterruptedException x) { + throw RError.error(RError.NO_NODE, RError.Message.GENERIC, "error transmitting through channel"); + } + } + + public static void send(int id, Object data) { + RChannel channel = getChannelFromId(id); + if (data instanceof RShareable) { + // make sure that what's passed through the channel will be copied on the first update + RShareable shareable = (RShareable) data; + if (FastROptions.NewStateTransition) { + shareable.incRefCount(); + shareable.incRefCount(); + } else { + shareable.makeShared(); + } + } + try { + (id > 0 ? channel.masterToClient : channel.clientToMaster).put(data); + } catch (InterruptedException x) { + throw RError.error(RError.NO_NODE, RError.Message.GENERIC, "error sending through the channel"); + } + } + + public static Object receive(int id) { + RChannel channel = getChannelFromId(id); + try { + return (id < 0 ? channel.masterToClient : channel.clientToMaster).take(); + } catch (InterruptedException x) { + throw RError.error(RError.NO_NODE, RError.Message.GENERIC, "error receiving from the channel"); + } + } + +} -- GitLab