Skip to content
Snippets Groups Projects
Commit de334fac authored by stepan's avatar stepan
Browse files

NFI based Connections.c implementation

parent 0f2663c5
Branches
No related tags found
No related merge requests found
Showing
with 401 additions and 318 deletions
...@@ -108,6 +108,7 @@ import com.oracle.truffle.r.runtime.ffi.DLL.CEntry; ...@@ -108,6 +108,7 @@ import com.oracle.truffle.r.runtime.ffi.DLL.CEntry;
import com.oracle.truffle.r.runtime.ffi.DLL.DLLInfo; import com.oracle.truffle.r.runtime.ffi.DLL.DLLInfo;
import com.oracle.truffle.r.runtime.ffi.DLL.DotSymbol; import com.oracle.truffle.r.runtime.ffi.DLL.DotSymbol;
import com.oracle.truffle.r.runtime.ffi.DLL.SymbolHandle; import com.oracle.truffle.r.runtime.ffi.DLL.SymbolHandle;
import com.oracle.truffle.r.runtime.ffi.UnsafeAdapter;
import com.oracle.truffle.r.runtime.gnur.SA_TYPE; import com.oracle.truffle.r.runtime.gnur.SA_TYPE;
import com.oracle.truffle.r.runtime.gnur.SEXPTYPE; import com.oracle.truffle.r.runtime.gnur.SEXPTYPE;
import com.oracle.truffle.r.runtime.nodes.DuplicationHelper; import com.oracle.truffle.r.runtime.nodes.DuplicationHelper;
...@@ -115,6 +116,8 @@ import com.oracle.truffle.r.runtime.nodes.RNode; ...@@ -115,6 +116,8 @@ import com.oracle.truffle.r.runtime.nodes.RNode;
import com.oracle.truffle.r.runtime.nodes.RSyntaxNode; import com.oracle.truffle.r.runtime.nodes.RSyntaxNode;
import com.oracle.truffle.r.runtime.rng.RRNG; import com.oracle.truffle.r.runtime.rng.RRNG;
import sun.misc.Unsafe;
/** /**
* This class provides a simple Java-based implementation of {@link UpCallsRFFI}, where all the * This class provides a simple Java-based implementation of {@link UpCallsRFFI}, where all the
* argument values are standard Java types, i.e. no special types used by Truffle NFI or Truffle * argument values are standard Java types, i.e. no special types used by Truffle NFI or Truffle
...@@ -1398,21 +1401,26 @@ public abstract class JavaUpCallsRFFIImpl implements UpCallsRFFI { ...@@ -1398,21 +1401,26 @@ public abstract class JavaUpCallsRFFIImpl implements UpCallsRFFI {
} }
@Override @Override
public int R_ReadConnection(int fd, Object bufObj) { public int R_ReadConnection(int fd, long bufAddress, int size) {
byte[] buf = (byte[]) bufObj; // Workaround using Unsafe until GR-5927 is fixed
byte[] buf = new byte[size];
int result = 0;
try (BaseRConnection fromIndex = RConnection.fromIndex(fd)) { try (BaseRConnection fromIndex = RConnection.fromIndex(fd)) {
Arrays.fill(buf, (byte) 0); Arrays.fill(buf, (byte) 0);
return fromIndex.readBin(ByteBuffer.wrap(buf)); result = fromIndex.readBin(ByteBuffer.wrap(buf));
} catch (IOException e) { } catch (IOException e) {
throw RError.error(RError.SHOW_CALLER, RError.Message.ERROR_READING_CONNECTION, e.getMessage()); throw RError.error(RError.SHOW_CALLER, RError.Message.ERROR_READING_CONNECTION, e.getMessage());
} }
UnsafeAdapter.UNSAFE.copyMemory(buf, Unsafe.ARRAY_BYTE_BASE_OFFSET, null, bufAddress, Math.min(result, size));
return result;
} }
@Override @Override
public int R_WriteConnection(int fd, Object bufObj) { public int R_WriteConnection(int fd, long bufAddress, int size) {
byte[] buf = (byte[]) bufObj; // Workaround using Unsafe until GR-5927 is fixed
byte[] buf = new byte[size];
UnsafeAdapter.UNSAFE.copyMemory(null, bufAddress, buf, Unsafe.ARRAY_BYTE_BASE_OFFSET, size);
try (BaseRConnection fromIndex = RConnection.fromIndex(fd)) { try (BaseRConnection fromIndex = RConnection.fromIndex(fd)) {
Arrays.fill(buf, (byte) 0);
final ByteBuffer wrapped = ByteBuffer.wrap(buf); final ByteBuffer wrapped = ByteBuffer.wrap(buf);
fromIndex.writeBin(wrapped); fromIndex.writeBin(wrapped);
return wrapped.position(); return wrapped.position();
......
...@@ -297,9 +297,9 @@ public interface StdUpCallsRFFI { ...@@ -297,9 +297,9 @@ public interface StdUpCallsRFFI {
Object R_new_custom_connection(@RFFICstring String description, @RFFICstring String mode, @RFFICstring String className, Object readAddr); Object R_new_custom_connection(@RFFICstring String description, @RFFICstring String mode, @RFFICstring String className, Object readAddr);
int R_ReadConnection(int fd, Object bufObj); int R_ReadConnection(int fd, long bufAddress, int size);
int R_WriteConnection(int fd, Object bufObj); int R_WriteConnection(int fd, long bufAddress, int size);
Object R_GetConnection(int fd); Object R_GetConnection(int fd);
......
...@@ -278,8 +278,22 @@ typedef double (*call_Rf_runif)(double x, double y); ...@@ -278,8 +278,22 @@ typedef double (*call_Rf_runif)(double x, double y);
typedef SEXP (*call_getvar)(); typedef SEXP (*call_getvar)();
// connections
typedef int (*call_FASTR_getConnectionChar)(SEXP connection); typedef int (*call_FASTR_getConnectionChar)(SEXP connection);
typedef int (*call_R_ReadConnection)(int fd, long bufAddress, int size);
typedef int (*call_R_WriteConnection)(int fd, long bufAddress, int size);
typedef SEXP (*call_R_new_custom_connection)(const char *description, const char *mode, const char *className, SEXP connAddrObj);
typedef SEXP (*call_R_GetConnection)(int fd);
typedef char* (*call_getConnectionClassString)(SEXP conn);
typedef char* (*call_getSummaryDescription)(SEXP conn);
typedef char* (*call_getOpenModeString)(SEXP conn);
typedef int (*call_isSeekable)(SEXP conn);
// symbols, dlls, etc.
typedef void (*call_registerRoutines)(DllInfo *dllInfo, int nstOrd, int num, const void* routines); typedef void (*call_registerRoutines)(DllInfo *dllInfo, int nstOrd, int num, const void* routines);
typedef int (*call_useDynamicSymbols)(DllInfo *dllInfo, Rboolean value); typedef int (*call_useDynamicSymbols)(DllInfo *dllInfo, Rboolean value);
......
...@@ -41,6 +41,7 @@ import com.oracle.truffle.r.runtime.data.RDoubleVector; ...@@ -41,6 +41,7 @@ import com.oracle.truffle.r.runtime.data.RDoubleVector;
import com.oracle.truffle.r.runtime.data.RExternalPtr; import com.oracle.truffle.r.runtime.data.RExternalPtr;
import com.oracle.truffle.r.runtime.data.RIntVector; import com.oracle.truffle.r.runtime.data.RIntVector;
import com.oracle.truffle.r.runtime.data.RLogicalVector; import com.oracle.truffle.r.runtime.data.RLogicalVector;
import com.oracle.truffle.r.runtime.data.RRawVector;
import com.oracle.truffle.r.runtime.ffi.CallRFFI; import com.oracle.truffle.r.runtime.ffi.CallRFFI;
import com.oracle.truffle.r.runtime.ffi.DLL; import com.oracle.truffle.r.runtime.ffi.DLL;
import com.oracle.truffle.r.runtime.ffi.DLL.DLLInfo; import com.oracle.truffle.r.runtime.ffi.DLL.DLLInfo;
...@@ -297,6 +298,14 @@ public class NativeConnections { ...@@ -297,6 +298,14 @@ public class NativeConnections {
private static class NativeChannel implements ByteChannel { private static class NativeChannel implements ByteChannel {
// Note: we wrap the ByteBuffer's array with a raw vector, which is on the native side
// converted to a pointer using RAW macro. This turns the raw vector into a native memory
// backed vector and any consecutive (write) operations in the native code are actually not
// done on the original vector that backs the byte buffer, so we need to copy back the date
// to the byte buffer. It would be more efficitent to use direct byte buffer, but then we'd
// need to make the native call interface (CallRFFI.InvokeCallRootNode) more flexible so
// that it can accept other argument types than SEXPs.
private final NativeRConnection base; private final NativeRConnection base;
NativeChannel(NativeRConnection base) { NativeChannel(NativeRConnection base) {
...@@ -307,16 +316,19 @@ public class NativeConnections { ...@@ -307,16 +316,19 @@ public class NativeConnections {
public int read(ByteBuffer dst) throws IOException { public int read(ByteBuffer dst) throws IOException {
NativeCallInfo ni = NativeConnections.getNativeFunctionInfo(READ_NATIVE_CONNECTION); NativeCallInfo ni = NativeConnections.getNativeFunctionInfo(READ_NATIVE_CONNECTION);
RootCallTarget nativeCallTarget = CallRFFI.InvokeCallRootNode.create().getCallTarget(); RootCallTarget nativeCallTarget = CallRFFI.InvokeCallRootNode.create().getCallTarget();
Object call = nativeCallTarget.call(ni, new Object[]{base.addr, dst.array(), dst.remaining()}); RRawVector bufferVec = RDataFactory.createRawVector(dst.remaining());
Object call = nativeCallTarget.call(ni, new Object[]{base.addr, bufferVec, dst.remaining()});
if (call instanceof RIntVector) { if (!(call instanceof RIntVector)) {
int nread = ((RIntVector) call).getDataAt(0); throw RInternalError.shouldNotReachHere("unexpected result type from native function, did the signature change?");
// update buffer's position !
dst.position(nread);
return nread;
} }
int nread = ((RIntVector) call).getDataAt(0);
throw RInternalError.shouldNotReachHere("unexpected result type"); if (nread > 0) {
// this should also update the buffer position
for (int i = 0; i < bufferVec.getLength(); i++) {
dst.put(bufferVec.getRawDataAt(i));
}
}
return nread;
} }
@Override @Override
...@@ -327,7 +339,7 @@ public class NativeConnections { ...@@ -327,7 +339,7 @@ public class NativeConnections {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
NativeCallInfo ni = NativeConnections.getNativeFunctionInfo(CLOSE_NATIVE_CONNECTION); NativeCallInfo ni = NativeConnections.getNativeFunctionInfo(CLOSE_NATIVE_CONNECTION);
RootCallTarget nativeCallTarget = CallRFFI.InvokeCallRootNode.create().getCallTarget(); RootCallTarget nativeCallTarget = CallRFFI.InvokeVoidCallRootNode.create().getCallTarget();
nativeCallTarget.call(ni, new Object[]{base.addr}); nativeCallTarget.call(ni, new Object[]{base.addr});
} }
...@@ -335,13 +347,15 @@ public class NativeConnections { ...@@ -335,13 +347,15 @@ public class NativeConnections {
public int write(ByteBuffer src) throws IOException { public int write(ByteBuffer src) throws IOException {
NativeCallInfo ni = NativeConnections.getNativeFunctionInfo(WRITE_NATIVE_CONNECTION); NativeCallInfo ni = NativeConnections.getNativeFunctionInfo(WRITE_NATIVE_CONNECTION);
RootCallTarget nativeCallTarget = CallRFFI.InvokeCallRootNode.create().getCallTarget(); RootCallTarget nativeCallTarget = CallRFFI.InvokeCallRootNode.create().getCallTarget();
ByteBuffer slice = src;
Object result = nativeCallTarget.call(ni, new Object[]{base.addr, src.array(), src.remaining()}); if (src.position() > 0) {
slice = src.slice();
}
RRawVector bufferVec = RDataFactory.createRawVector(slice.array());
Object result = nativeCallTarget.call(ni, new Object[]{base.addr, bufferVec, src.remaining()});
if (result instanceof RIntVector) { if (result instanceof RIntVector) {
return ((RIntVector) result).getDataAt(0); return ((RIntVector) result).getDataAt(0);
} }
throw RInternalError.shouldNotReachHere("unexpected result type"); throw RInternalError.shouldNotReachHere("unexpected result type");
} }
} }
......
...@@ -176,3 +176,15 @@ rffi.captureDotsWithSingleElement <- function(env) { ...@@ -176,3 +176,15 @@ rffi.captureDotsWithSingleElement <- function(env) {
rffi.evalAndNativeArrays <- function(vec, expr, env) { rffi.evalAndNativeArrays <- function(vec, expr, env) {
.Call('test_evalAndNativeArrays', vec, expr, env) .Call('test_evalAndNativeArrays', vec, expr, env)
} }
rffi.writeConnection <- function(connection) {
.Call('test_writeConnection', connection);
}
rffi.readConnection <- function(connection) {
.Call('test_readConnection', connection);
}
rffi.createNativeConnection <- function() {
.Call('test_createNativeConnection');
}
\ No newline at end of file
...@@ -77,6 +77,9 @@ static const R_CallMethodDef CallEntries[] = { ...@@ -77,6 +77,9 @@ static const R_CallMethodDef CallEntries[] = {
CALLDEF(test_stringNA, 0), CALLDEF(test_stringNA, 0),
CALLDEF(test_captureDotsWithSingleElement, 1), CALLDEF(test_captureDotsWithSingleElement, 1),
CALLDEF(test_evalAndNativeArrays, 3), CALLDEF(test_evalAndNativeArrays, 3),
CALLDEF(test_writeConnection, 1),
CALLDEF(test_readConnection, 1),
CALLDEF(test_createNativeConnection, 0),
{NULL, NULL, 0} {NULL, NULL, 0}
}; };
......
...@@ -28,6 +28,8 @@ ...@@ -28,6 +28,8 @@
#include <Rinterface.h> #include <Rinterface.h>
#include <Rinternals.h> #include <Rinternals.h>
#include <Rinterface.h> #include <Rinterface.h>
#include <R_ext/Connections.h>
#include <string.h>
#include "testrffi.h" #include "testrffi.h"
void dotCModifiedArguments(int* len, int* idata, double* rdata, int* ldata, char** cdata) { void dotCModifiedArguments(int* len, int* idata, double* rdata, int* ldata, char** cdata) {
...@@ -431,3 +433,91 @@ SEXP test_evalAndNativeArrays(SEXP vec, SEXP expr, SEXP env) { ...@@ -431,3 +433,91 @@ SEXP test_evalAndNativeArrays(SEXP vec, SEXP expr, SEXP env) {
UNPROTECT(uprotectCount); UNPROTECT(uprotectCount);
return vec; return vec;
} }
SEXP test_writeConnection(SEXP connVec) {
Rconnection connection = R_GetConnection(connVec);
char* greeting = "Hello from R_WriteConnection";
R_WriteConnection(connection, greeting, strlen(greeting));
return R_NilValue;
}
SEXP test_readConnection(SEXP connVec) {
Rconnection connection = R_GetConnection(connVec);
unsigned char buffer[255];
int size = R_ReadConnection(connection, buffer, 255);
SEXP result;
PROTECT(result = allocVector(RAWSXP, size));
unsigned char* resultData = RAW(result);
for (int i = 0; i < size; ++i) {
resultData[i] = buffer[i];
}
UNPROTECT(1);
return result;
}
static Rconnection customConn;
static void printNow(const char* message) {
puts(message);
fflush(stdout);
}
static void testrfficonn_destroy(Rconnection conn) {
if (conn != customConn) {
printNow("ERROR: destroy function did not receive expected argument\n");
} else {
printNow("Custom connection destroyed\n");
}
}
static Rboolean testrfficonn_open(Rconnection conn) {
if (conn != customConn) {
printNow("ERROR: open function did not receive expected argument\n");
return 0;
} else {
printNow("Custom connection opened\n");
return 1;
}
}
static void testrfficonn_close(Rconnection conn) {
if (conn != customConn) {
printNow("ERROR: close function did not receive expected argument\n");
} else {
printNow("Custom connection closed\n");
}
}
static size_t testrfficonn_write(const void * message, size_t size, size_t nitems, Rconnection conn) {
if (conn != customConn) {
printNow("ERROR: write function did not receive expected argument\n");
return 0;
} else {
printf("Custom connection printing: %.*s\n", (int) (size * nitems), (char*) message);
fflush(stdout);
return size * nitems;
}
}
static size_t testrfficonn_read(void *buffer, size_t size, size_t niterms, Rconnection conn) {
if (conn != customConn) {
printNow("ERROR: read function did not receive expected argument\n");
return 0;
} else if (size * niterms > 0) {
((char *)buffer)[0] = 'Q';
return 1;
}
return 0;
}
SEXP test_createNativeConnection() {
SEXP newConnSEXP = R_new_custom_connection("Connection for testing purposes", "w", "testrfficonn", &customConn);
customConn->isopen = 0;
customConn->canwrite = 1;
customConn->destroy = &testrfficonn_destroy;
customConn->open = &testrfficonn_open;
customConn->close = &testrfficonn_close;
customConn->write = &testrfficonn_write;
// customConn->read = &testrfficonn_read; TODO: read test
return newConnSEXP;
}
...@@ -97,3 +97,9 @@ extern SEXP test_stringNA(void); ...@@ -97,3 +97,9 @@ extern SEXP test_stringNA(void);
extern SEXP test_captureDotsWithSingleElement(SEXP env); extern SEXP test_captureDotsWithSingleElement(SEXP env);
extern SEXP test_evalAndNativeArrays(SEXP vec, SEXP expr, SEXP env); extern SEXP test_evalAndNativeArrays(SEXP vec, SEXP expr, SEXP env);
extern SEXP test_writeConnection(SEXP conn);
extern SEXP test_readConnection(SEXP conn);
extern SEXP test_createNativeConnection(void);
# Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
#
# This code is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 2 only, as
# published by the Free Software Foundation.
#
# This code is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# version 2 for more details (a copy is included in the LICENSE file that
# accompanied this code).
#
# You should have received a copy of the GNU General Public License version
# 2 along with this work; if not, write to the Free Software Foundation,
# Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
# or visit www.oracle.com if you need additional information or have any
# questions.
stopifnot(require(testrffi))
# invoking the R_WriteConnection and R_ReadConnection from native code
zz <- file("testbinconn", "wb")
rffi.writeConnection(zz)
close(zz)
readLines("testbinconn")
zz <- file("testbinconn", "rb")
rffi.readConnection(zz);
unlink("testbinconn")
# custom connections implemented in native code
nativeConn <- rffi.createNativeConnection()
nativeConn
writeChar("Hello to custom native connections!\n", nativeConn)
# TODO: close ignored -- FastR does not call destroy
# close(nativeConn)
# TODO: read test is not working
# FastR reports: Error in readChar(nativeConn2, 42) : invalid connection
# nativeConn2 <- rffi.createNativeConnection()
# readChar(nativeConn2, 42)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment