From de334fac4aa8460a8cd23ab1995d5f2b092de377 Mon Sep 17 00:00:00 2001
From: stepan <stepan.sindelar@oracle.com>
Date: Mon, 11 Sep 2017 15:00:53 +0200
Subject: [PATCH] NFI based Connections.c implementation

---
 .../ffi/impl/common/JavaUpCallsRFFIImpl.java  |  20 +-
 .../r/ffi/impl/upcalls/StdUpCallsRFFI.java    |   4 +-
 .../fficall/src/common/rffi_upcalls.h         |  14 +
 .../fficall/src/truffle_common/Connections.c  | 484 +++++++-----------
 .../r/runtime/conn/NativeConnections.java     |  42 +-
 .../packages/testrffi/testrffi/R/testrffi.R   |  12 +
 .../packages/testrffi/testrffi/src/init.c     |   3 +
 .../packages/testrffi/testrffi/src/testrffi.c |  90 ++++
 .../packages/testrffi/testrffi/src/testrffi.h |   6 +
 .../testrffi/testrffi/tests/connections.R     |  44 ++
 10 files changed, 401 insertions(+), 318 deletions(-)
 create mode 100644 com.oracle.truffle.r.test.native/packages/testrffi/testrffi/tests/connections.R

diff --git a/com.oracle.truffle.r.ffi.impl/src/com/oracle/truffle/r/ffi/impl/common/JavaUpCallsRFFIImpl.java b/com.oracle.truffle.r.ffi.impl/src/com/oracle/truffle/r/ffi/impl/common/JavaUpCallsRFFIImpl.java
index df5375d70c..ebb04b5f27 100644
--- a/com.oracle.truffle.r.ffi.impl/src/com/oracle/truffle/r/ffi/impl/common/JavaUpCallsRFFIImpl.java
+++ b/com.oracle.truffle.r.ffi.impl/src/com/oracle/truffle/r/ffi/impl/common/JavaUpCallsRFFIImpl.java
@@ -108,6 +108,7 @@ import com.oracle.truffle.r.runtime.ffi.DLL.CEntry;
 import com.oracle.truffle.r.runtime.ffi.DLL.DLLInfo;
 import com.oracle.truffle.r.runtime.ffi.DLL.DotSymbol;
 import com.oracle.truffle.r.runtime.ffi.DLL.SymbolHandle;
+import com.oracle.truffle.r.runtime.ffi.UnsafeAdapter;
 import com.oracle.truffle.r.runtime.gnur.SA_TYPE;
 import com.oracle.truffle.r.runtime.gnur.SEXPTYPE;
 import com.oracle.truffle.r.runtime.nodes.DuplicationHelper;
@@ -115,6 +116,8 @@ import com.oracle.truffle.r.runtime.nodes.RNode;
 import com.oracle.truffle.r.runtime.nodes.RSyntaxNode;
 import com.oracle.truffle.r.runtime.rng.RRNG;
 
+import sun.misc.Unsafe;
+
 /**
  * This class provides a simple Java-based implementation of {@link UpCallsRFFI}, where all the
  * argument values are standard Java types, i.e. no special types used by Truffle NFI or Truffle
@@ -1398,21 +1401,26 @@ public abstract class JavaUpCallsRFFIImpl implements UpCallsRFFI {
     }
 
     @Override
-    public int R_ReadConnection(int fd, Object bufObj) {
-        byte[] buf = (byte[]) bufObj;
+    public int R_ReadConnection(int fd, long bufAddress, int size) {
+        // Workaround using Unsafe until GR-5927 is fixed
+        byte[] buf = new byte[size];
+        int result = 0;
         try (BaseRConnection fromIndex = RConnection.fromIndex(fd)) {
             Arrays.fill(buf, (byte) 0);
-            return fromIndex.readBin(ByteBuffer.wrap(buf));
+            result = fromIndex.readBin(ByteBuffer.wrap(buf));
         } catch (IOException e) {
             throw RError.error(RError.SHOW_CALLER, RError.Message.ERROR_READING_CONNECTION, e.getMessage());
         }
+        UnsafeAdapter.UNSAFE.copyMemory(buf, Unsafe.ARRAY_BYTE_BASE_OFFSET, null, bufAddress, Math.min(result, size));
+        return result;
     }
 
     @Override
-    public int R_WriteConnection(int fd, Object bufObj) {
-        byte[] buf = (byte[]) bufObj;
+    public int R_WriteConnection(int fd, long bufAddress, int size) {
+        // Workaround using Unsafe until GR-5927 is fixed
+        byte[] buf = new byte[size];
+        UnsafeAdapter.UNSAFE.copyMemory(null, bufAddress, buf, Unsafe.ARRAY_BYTE_BASE_OFFSET, size);
         try (BaseRConnection fromIndex = RConnection.fromIndex(fd)) {
-            Arrays.fill(buf, (byte) 0);
             final ByteBuffer wrapped = ByteBuffer.wrap(buf);
             fromIndex.writeBin(wrapped);
             return wrapped.position();
diff --git a/com.oracle.truffle.r.ffi.impl/src/com/oracle/truffle/r/ffi/impl/upcalls/StdUpCallsRFFI.java b/com.oracle.truffle.r.ffi.impl/src/com/oracle/truffle/r/ffi/impl/upcalls/StdUpCallsRFFI.java
index 0f495700d6..feda9ade15 100644
--- a/com.oracle.truffle.r.ffi.impl/src/com/oracle/truffle/r/ffi/impl/upcalls/StdUpCallsRFFI.java
+++ b/com.oracle.truffle.r.ffi.impl/src/com/oracle/truffle/r/ffi/impl/upcalls/StdUpCallsRFFI.java
@@ -297,9 +297,9 @@ public interface StdUpCallsRFFI {
 
     Object R_new_custom_connection(@RFFICstring String description, @RFFICstring String mode, @RFFICstring String className, Object readAddr);
 
-    int R_ReadConnection(int fd, Object bufObj);
+    int R_ReadConnection(int fd, long bufAddress, int size);
 
-    int R_WriteConnection(int fd, Object bufObj);
+    int R_WriteConnection(int fd, long bufAddress, int size);
 
     Object R_GetConnection(int fd);
 
diff --git a/com.oracle.truffle.r.native/fficall/src/common/rffi_upcalls.h b/com.oracle.truffle.r.native/fficall/src/common/rffi_upcalls.h
index a2bb060620..3e7d642615 100644
--- a/com.oracle.truffle.r.native/fficall/src/common/rffi_upcalls.h
+++ b/com.oracle.truffle.r.native/fficall/src/common/rffi_upcalls.h
@@ -278,8 +278,22 @@ typedef double (*call_Rf_runif)(double x, double y);
 
 typedef SEXP (*call_getvar)();
 
+// connections
+
 typedef int (*call_FASTR_getConnectionChar)(SEXP connection);
+typedef int (*call_R_ReadConnection)(int fd, long bufAddress, int size);
+typedef int (*call_R_WriteConnection)(int fd, long bufAddress, int size);
+
+typedef SEXP (*call_R_new_custom_connection)(const char *description, const char *mode, const char *className, SEXP connAddrObj);
+
+typedef SEXP (*call_R_GetConnection)(int fd);
+typedef char* (*call_getConnectionClassString)(SEXP conn);
+typedef char* (*call_getSummaryDescription)(SEXP conn);
+typedef char* (*call_getOpenModeString)(SEXP conn);
+typedef int (*call_isSeekable)(SEXP conn);
+
 
+// symbols, dlls, etc.
 
 typedef void (*call_registerRoutines)(DllInfo *dllInfo, int nstOrd, int num, const void* routines);
 typedef int (*call_useDynamicSymbols)(DllInfo *dllInfo, Rboolean value);
diff --git a/com.oracle.truffle.r.native/fficall/src/truffle_common/Connections.c b/com.oracle.truffle.r.native/fficall/src/truffle_common/Connections.c
index ffdf80fde5..8a179cbe7c 100644
--- a/com.oracle.truffle.r.native/fficall/src/truffle_common/Connections.c
+++ b/com.oracle.truffle.r.native/fficall/src/truffle_common/Connections.c
@@ -21,142 +21,81 @@
  * questions.
  */
 
-
-#if FALSE
-#include <assert.h>
-#include <rffiutils.h>
+#include <Defn.h>
 #include <R_ext/Connections.h>
+// Note: R_ext/Connections.h depends on Defn.h,
+// but does not include it -> the order is important
 
-
-static jmethodID readConnMethodID;
-static jmethodID writeConnMethodID;
-static jmethodID getConnMethodID;
-static jmethodID getConnClassMethodID;
-static jmethodID getSummaryDescMethodID;
-static jmethodID isSeekableMethodID;
-static jmethodID getOpenModeMethodID;
-static jmethodID newCustomConnectionMethodID;
-
-static jbyteArray wrap(JNIEnv *thisenv, void* buf, size_t n) {
-    jbyteArray barr = (*thisenv)->NewByteArray(thisenv, n);
-    (*thisenv)->SetByteArrayRegion(thisenv, barr, 0, n, buf);
-    return barr;
-}
+#include "rffi_upcalls.h"
+#include "../truffle_nfi/rffiutils.h"
 
 /*
  * Returns the file descriptor of the connection if possible.
  * Otherwise an error is issued.
  */
 static int getFd(Rconnection con) {
-	return (int) con->id;
+    return (int) con->id;
 }
 
 /*
  * Sets the file descriptor for the connection.
  */
-static void setFd(Rconnection con, jint fd) {
-	con->id = (void *) (jlong) fd;
-}
-
-
-void init_connections(JNIEnv *env) {
-	/* int readConn(int, byte[]) */
-	readConnMethodID = checkGetMethodID(env, UpCallsRFFIClass, "R_ReadConnection", "(ILjava/lang/Object;)I", 0);
-
-	/* int writeConn(int, byte[]) */
-	writeConnMethodID = checkGetMethodID(env, UpCallsRFFIClass, "R_WriteConnection", "(ILjava/lang/Object;)I", 0);
-
-	/* RConnection getConnection(int) */
-	getConnMethodID = checkGetMethodID(env, UpCallsRFFIClass, "R_GetConnection", "(I)Ljava/lang/Object;", 0);
-
-
-	/* String getConnectionClassString(BaseRConnection) */
-	getConnClassMethodID = checkGetMethodID(env, UpCallsRFFIClass, "getConnectionClassString", "(Ljava/lang/Object;)Ljava/lang/String;", 0);
-
-	/* String getSummaryDescription(BaseRConnection) */
-	getSummaryDescMethodID = checkGetMethodID(env, UpCallsRFFIClass, "getSummaryDescription", "(Ljava/lang/Object;)Ljava/lang/String;", 0);
-
-	/* boolean isSeekable(BaseRConnection) */
-	isSeekableMethodID = checkGetMethodID(env, UpCallsRFFIClass, "isSeekable", "(Ljava/lang/Object;)Z", 0);
-
-	/* String getOpenModeString(BaseRConnection) */
-	getOpenModeMethodID = checkGetMethodID(env, UpCallsRFFIClass, "getOpenModeString", "(Ljava/lang/Object;)Ljava/lang/String;", 0);
-
-	/* int R_new_custom_connection(String, String, String) */
-	newCustomConnectionMethodID = checkGetMethodID(env, UpCallsRFFIClass, "R_new_custom_connection", "(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;", 0);
-}
-
-static char *connStringToChars(JNIEnv *env, jstring string) {
-    jsize len = (*env)->GetStringUTFLength(env, string);
-    const char *stringChars = (*env)->GetStringUTFChars(env, string, NULL);
-    char *copyChars = malloc((len + 1)*sizeof(char));
-    memcpy(copyChars, stringChars, len*sizeof(char));
-    copyChars[len] = 0;
-	(*env)->ReleaseStringUTFChars(env, string, stringChars);
-	(*env)->DeleteLocalRef(env, string);
-    return copyChars;
+static void setFd(Rconnection con, int fd) {
+    con->id = (void *) (long) fd;
 }
 
 /* ------------------- null connection functions --------------------- */
 
-static Rboolean NORET null_open(Rconnection con)
-{
+static Rboolean NORET null_open(Rconnection con) {
     error(_("%s not enabled for this connection"), "open");
 }
 
-static void null_close(Rconnection con)
-{
+static void null_close(Rconnection con) {
     con->isopen = FALSE;
 }
 
-static void null_destroy(Rconnection con)
-{
-    if(con->private) free(con->private);
+static void null_destroy(Rconnection con) {
+    if (con->private) free(con->private);
 }
 
-static int NORET null_vfprintf(Rconnection con, const char *format, va_list ap)
-{
+static int NORET null_vfprintf(Rconnection con, const char *format, va_list ap) {
     error(_("%s not enabled for this connection"), "printing");
 }
 
-static int NORET null_fgetc(Rconnection con)
-{
+static int NORET null_fgetc(Rconnection con) {
     error(_("%s not enabled for this connection"), "'getc'");
 }
 
-static double NORET null_seek(Rconnection con, double where, int origin, int rw)
-{
+static double NORET null_seek(Rconnection con, double where, int origin, int rw) {
     error(_("%s not enabled for this connection"), "'seek'");
 }
 
-static void NORET null_truncate(Rconnection con)
-{
+static void NORET null_truncate(Rconnection con) {
     error(_("%s not enabled for this connection"), "truncation");
 }
 
-static int null_fflush(Rconnection con)
-{
-	return 0;
+static int null_fflush(Rconnection con) {
+    return 0;
 }
 
 static size_t NORET null_read(void *ptr, size_t size, size_t nitems,
-			Rconnection con)
-{
+                              Rconnection con) {
     error(_("%s not enabled for this connection"), "'read'");
 }
 
 static size_t NORET null_write(const void *ptr, size_t size, size_t nitems,
-			 Rconnection con)
-{
+                               Rconnection con) {
     error(_("%s not enabled for this connection"), "'write'");
 }
 
+/* ------------------- connection structure functions --------------------- */
+
 static void init_con(Rconnection new, char *description, int enc,
-	      const char * const mode)
-{
+                     const char *const mode) {
     new->description = description;
     new->enc = enc;
-    strncpy(new->mode, mode, 4); new->mode[4] = '\0';
+    strncpy(new->mode, mode, 4);
+    new->mode[4] = '\0';
     new->isopen = new->incomplete = new->blocking = new->isGzcon = FALSE;
     new->canread = new->canwrite = TRUE; /* in principle */
     new->canseek = FALSE;
@@ -182,56 +121,50 @@ static void init_con(Rconnection new, char *description, int enc,
 }
 
 SEXP R_new_custom_connection(const char *description, const char *mode, const char *class_name, Rconnection *ptr) {
-	JNIEnv *thisenv = getEnv();
-	Rconnection new;
-	SEXP ans, class;
-
-	new = (Rconnection) malloc(sizeof(struct Rconn));
-	if (!new)
-		error(_("allocation of %s connection failed"), class_name);
-
-	jstring jsDescription = (*thisenv)->NewStringUTF(thisenv, description);
-	jstring jsMode = (*thisenv)->NewStringUTF(thisenv, mode);
-	jstring jsClassName = (*thisenv)->NewStringUTF(thisenv, class_name);
-	jobject addrObj = R_MakeExternalPtr(new, R_NilValue, R_NilValue);
-	ans = (*thisenv)->CallObjectMethod(thisenv, UpCallsRFFIObject, newCustomConnectionMethodID, jsDescription, jsMode, jsClassName, addrObj);
-	if (ans) {
-
-		new->class = (char *) malloc(strlen(class_name) + 1);
-		if (!new->class) {
-			free(new);
-			error(_("allocation of %s connection failed"), class_name);
-		}
-		strcpy(new->class, class_name);
-		new->description = (char *) malloc(strlen(description) + 1);
-		if (!new->description) {
-			free(new->class);
-			free(new);
-			error(_("allocation of %s connection failed"), class_name);
-		}
-		init_con(new, (char *) description, CE_NATIVE, mode);
-		/* all ptrs are init'ed to null_* so no need to repeat that,
-		 but the following two are useful tools which could not be accessed otherwise */
-		// TODO dummy_vfprintf and dummy_fgetc not implemented yet
-//    new->vfprintf = &dummy_vfprintf;
-//    new->fgetc = &dummy_fgetc;
-
-		/* new->blocking = block; */
-		new->encname[0] = 0; /* "" (should have the same effect as "native.enc") */
-		new->ex_ptr = R_MakeExternalPtr(new->id, install("connection"), R_NilValue);
-
-		class = allocVector(STRSXP, 2);
-		SET_STRING_ELT(class, 0, mkChar(class_name));
-		SET_STRING_ELT(class, 1, mkChar("connection"));
-		classgets(ans, class);
-//		setAttrib(ans, R_ConnIdSymbol, new->ex_ptr);
-
-		if (ptr) {
-			ptr[0] = new;
-		}
-	}
-
-	return ans;
+    Rconnection new = (Rconnection) malloc(sizeof(struct Rconn));
+    if (!new)
+        error(_("allocation of %s connection failed"), class_name);
+
+    SEXP addrObj = R_MakeExternalPtr(new, R_NilValue, R_NilValue);
+    SEXP fastRConn = ((call_R_new_custom_connection) callbacks[R_new_custom_connection_x])(description, mode,
+                                                                                           class_name, addrObj);
+    // printf("DEBUG: R_new_custom_connection address %p SEXP value %p\n", ptr, addrObj);
+    if (fastRConn) {
+        new->class = (char *) malloc(strlen(class_name) + 1);
+        if (!new->class) {
+            free(new);
+            error(_("allocation of %s connection failed"), class_name);
+        }
+        strcpy(new->class, class_name);
+        new->description = (char *) malloc(strlen(description) + 1);
+        if (!new->description) {
+            free(new->class);
+            free(new);
+            error(_("allocation of %s connection failed"), class_name);
+        }
+        init_con(new, (char *) description, CE_NATIVE, mode);
+        /* all ptrs are init'ed to null_* so no need to repeat that,
+         but the following two are useful tools which could not be accessed otherwise */
+        // TODO dummy_vfprintf and dummy_fgetc not implemented in FastR yet
+        //    new->vfprintf = &dummy_vfprintf;
+        //    new->fgetc = &dummy_fgetc;
+
+        /* new->blocking = block; */
+        new->encname[0] = 0; /* "" (should have the same effect as "native.enc") */
+        new->ex_ptr = R_MakeExternalPtr(new->id, install("connection"), R_NilValue);
+
+        SEXP class = allocVector(STRSXP, 2);
+        SET_STRING_ELT(class, 0, mkChar(class_name));
+        SET_STRING_ELT(class, 1, mkChar("connection"));
+        classgets(fastRConn, class);
+        // setAttrib(ans, R_ConnIdSymbol, new->ex_ptr); -- TODO not implemented/needed? in FastR
+
+        if (ptr) {
+            *ptr = new;
+        }
+    }
+
+    return fastRConn;
 }
 
 /*
@@ -241,192 +174,151 @@ SEXP R_new_custom_connection(const char *description, const char *mode, const ch
  * This currently assumes max. 64-bit addresses !
  */
 static Rconnection convertToAddress(SEXP addrObj) {
-	if(!inherits(addrObj, "externalptr")) {
-		error(_("invalid address object"));
-	}
+    if (!inherits(addrObj, "externalptr")) {
+        error(_("invalid address object"));
+    }
     return (Rconnection) R_ExternalPtrAddr(addrObj);
 
 }
 
-/*
- * This function is used as Java down call function to query the value of a connection's flag.
- * DO NOT CHANGE ITS SIGNATURE !
- * If changing the signature is unavoidable, adapt it in class 'NativeConnections'.
+/* --------------------------------------------------------------------------- */
+/* ------------------- Functions used as Java down calls --------------------- */
+
+/* These functions are invoked from Java when the user does some operation on a
+ * custom connection registered via R_new_custom_connection. We only have native
+ * functions for such connection. These functions are invoked through the same
+ * mechanism as e.g. the .C or .Call builtin, i.e. they accept only SEXP arguments
+ * and may be either void or return SEXP. Otherwise we'd have to provide signature
+ * of each function so that NFI knows how to convert the arguments and we'd have to
+ * provide another mechanism to call native functions (aside .C/.Call/etc.)
+ * with supplied signature. Therefore:
+ *
+ * DO NOT CHANGE SIGNATURE OF THESE FUNCTIONS!
+ * If you do, update 'NativeConnections.java' accordingly.
+ * Arguments and return type can ONLY BE SEXP.
  */
-SEXP __GetFlagNativeConnection(SEXP rConnAddrObj, jstring jname) {
-    JNIEnv *thisenv = getEnv();
-	Rconnection con = convertToAddress(rConnAddrObj);
-	const char *name = connStringToChars(thisenv, jname);
-	Rboolean result = 0;
-
-	if(strcmp(name, "text") == 0) {
-		result = con->text;
-	} else if(strcmp(name, "isopen") == 0) {
-		result = con->isopen;
-	}else if(strcmp(name, "incomplete") == 0) {
-		result = con->incomplete;
-	}else if(strcmp(name, "canread") == 0) {
-		result = con->canread;
-	}else if(strcmp(name, "canwrite") == 0) {
-		result = con->canwrite;
-	}else if(strcmp(name, "canseek") == 0) {
-		result = con->canseek;
-	}else if(strcmp(name, "blocking") == 0) {
-		result = con->blocking;
-	}
-	free((char *)name);
-
-	return ScalarLogical(result);
+
+SEXP __GetFlagNativeConnection(SEXP rConnAddrObj, SEXP nameVec) {
+    Rconnection con = convertToAddress(rConnAddrObj);
+    const char *name = CHAR(Rf_asChar(nameVec));
+    // printf("DEBUG: __GetFlagNativeConnection address %p SEXP value %p, flag '%s'\n", con, rConnAddrObj, name);
+    if (strcmp(name, "text") == 0) {
+        return ScalarLogical(con->text);
+    } else if (strcmp(name, "isopen") == 0) {
+        return ScalarLogical(con->isopen);
+    } else if (strcmp(name, "incomplete") == 0) {
+        return ScalarLogical(con->incomplete);
+    } else if (strcmp(name, "canread") == 0) {
+        return ScalarLogical(con->canread);
+    } else if (strcmp(name, "canwrite") == 0) {
+        return ScalarLogical(con->canwrite);
+    } else if (strcmp(name, "canseek") == 0) {
+        return ScalarLogical(con->canseek);
+    } else if (strcmp(name, "blocking") == 0) {
+        return ScalarLogical(con->blocking);
+    }
+    char errorBuffer[128];
+    sprintf(errorBuffer, "Unknown flag '%.12s' in __GetFlagNativeConnection. "
+            "This function should be used from NativeConnections.java", name);
+    error(errorBuffer);
 }
 
-/*
- * This function is used as Java down call function to invoke the open function of a natively created connection.
- * DO NOT CHANGE ITS SIGNATURE !
- * If changing the signature is unavoidable, adapt it in class 'NativeConnections'.
- */
 SEXP __OpenNativeConnection(SEXP rConnAddrObj) {
-	Rconnection con = convertToAddress(rConnAddrObj);
-	Rboolean success = con->open(con);
-	return ScalarLogical(success);
+    Rconnection con = convertToAddress(rConnAddrObj);
+    Rboolean success = con->open(con);
+    return ScalarLogical(success);
 }
 
-/*
- * This function is used as Java down call function to invoke the open function of a natively created connection.
- * DO NOT CHANGE ITS SIGNATURE !
- * If changing the signature is unavoidable, adapt it in class 'NativeConnections'.
- */
-SEXP __CloseNativeConnection(SEXP rConnAddrObj) {
-	Rconnection con = convertToAddress(rConnAddrObj);
-	con->close(con);
-	return NULL;
+void __CloseNativeConnection(SEXP rConnAddrObj) {
+    Rconnection con = convertToAddress(rConnAddrObj);
+    con->close(con);
 }
 
-/*
- * This function is used as Java down call function to invoke the read function of a natively created connection.
- * DO NOT CHANGE ITS SIGNATURE !
- * If changing the signature is unavoidable, adapt it in class 'NativeConnections'.
- */
-SEXP __ReadNativeConnection(SEXP rConnAddrObj, jbyteArray bufObj, SEXP nVec) {
-    JNIEnv *thisenv = getEnv();
+// Note: we do not check if connection is open, this should be done on the Java side
+
+SEXP __ReadNativeConnection(SEXP rConnAddrObj, SEXP bufVec, SEXP nVec) {
+    Rconnection con = convertToAddress(rConnAddrObj);
     int n = asInteger(nVec);
-	Rconnection con = convertToAddress(rConnAddrObj);
-	void *tmp_buf = (*thisenv)->GetByteArrayElements(thisenv, bufObj, NULL);
-	size_t nread = con->read(tmp_buf, 1, n, con);
-	// copy back and release buffer
-	(*thisenv)->ReleaseByteArrayElements(thisenv, bufObj, tmp_buf, JNI_COMMIT);
-	return ScalarInteger(nread);
+    if (!con->canread) {
+        error(_("cannot read from this connection"));
+    }
+    return ScalarInteger(con->read(RAW(bufVec), 1, n, con));
 }
 
-/*
- * This function is used as Java down call function to invoke the write function of a natively created connection.
- * DO NOT CHANGE ITS SIGNATURE !
- * If changing the signature is unavoidable, adapt it in class 'NativeConnections'.
- */
-SEXP __WriteNativeConnection(SEXP rConnAddrObj, jbyteArray bufObj, SEXP nVec) {
-    JNIEnv *thisenv = getEnv();
+SEXP __WriteNativeConnection(SEXP rConnAddrObj, SEXP bufVec, SEXP nVec) {
+    Rconnection con = convertToAddress(rConnAddrObj);
     int n = asInteger(nVec);
-	Rconnection con = convertToAddress(rConnAddrObj);
-	void *bytes = (*thisenv)->GetByteArrayElements(thisenv, bufObj, NULL);
-	size_t nwritten = con->write(bytes, 1, n, con);
-	// just release buffer
-	(*thisenv)->ReleaseByteArrayElements(thisenv, bufObj, bytes, JNI_ABORT);
-	return ScalarInteger(nwritten);
+    if (!con->canread) {
+        error(_("cannot read from this connection"));
+    }
+    return ScalarInteger(con->write(RAW(bufVec), 1, n, con));
 }
 
-/*
- * This function is used as Java down call function to invoke the seek function of a natively created connection.
- * DO NOT CHANGE ITS SIGNATURE !
- * If changing the signature is unavoidable, adapt it in class 'NativeConnections'.
- */
-SEXP __SeekNativeConnection(SEXP rConnAddrObj, SEXP whereObj, SEXP originObj, SEXP rwObj) {
-	Rconnection con = convertToAddress(rConnAddrObj);
-    double where = asReal(whereObj);
-    int origin = asInteger(originObj);
-    int rw = asInteger(rwObj);
-	double oldPos = con->seek(con, where, origin, rw);
-	return ScalarReal(oldPos);
+SEXP __SeekNativeConnection(SEXP rConnAddrObj, SEXP whereVec, SEXP originVec, SEXP rwVec) {
+    Rconnection con = convertToAddress(rConnAddrObj);
+    int rw = asInteger(rwVec);
+    int origin = asInteger(originVec);
+    double where = asReal(whereVec);
+    return ScalarReal(con->seek(con, where, origin, rw));
 }
 
+/* --------------------------------------------------------------------------- */
+/* ---------------- R API functions for accessing connections ---------------- */
+
+/* These functions are used by packages to read/write data from/to connections.
+ * We have to upcall to Java, because the default connections are implemented there.
+ * If the connection happens to be custom connection implemented on the native side,
+ * then the Java side finds out and downcalls back to e.g. __ReadNativeConnection
+ */
+
 size_t R_ReadConnection(Rconnection con, void *buf, size_t n) {
-    JNIEnv *thisenv = getEnv();
-    jbyteArray barr = (*thisenv)->NewByteArray(thisenv, n);
-
-    jint result =  (*thisenv)->CallIntMethod(thisenv, UpCallsRFFIObject, readConnMethodID, getFd(con), barr);
-    size_t readBytes = result >= 0 ? (size_t) result : 0;
-    assert(result <= (ssize_t) n);
-    if(result > 0) {
-    	(*thisenv)->GetByteArrayRegion(thisenv, barr, 0, result, buf);
-    }
-    return readBytes;
+    return (size_t) ((call_R_ReadConnection) callbacks[R_ReadConnection_x])(getFd(con), (long) buf, (int) n);
 }
 
 size_t R_WriteConnection(Rconnection con, void *buf, size_t n) {
-    JNIEnv *thisenv = getEnv();
-    jbyteArray barr = wrap(thisenv, buf, n);
-
-    jint result =  (*thisenv)->CallIntMethod(thisenv, UpCallsRFFIObject, writeConnMethodID, getFd(con), barr);
-    return result >= 0 ? (size_t) result : 0;
+    return (size_t) ((call_R_WriteConnection) callbacks[R_WriteConnection_x])(getFd(con), (long) buf, (int) n);
 }
 
 Rconnection R_GetConnection(SEXP sConn) {
-	if (!inherits(sConn, "connection")) {
-		error(_("invalid connection"));
-	}
-
-	int fd = asInteger(sConn);
-
-	JNIEnv *thisenv = getEnv();
-	jobject jRconn = (*thisenv)->CallObjectMethod(thisenv, UpCallsRFFIObject, getConnMethodID, fd);
-
-	// query getConnectionClassString
-	jstring jConnClass = (*thisenv)->CallObjectMethod(thisenv, UpCallsRFFIObject, getConnClassMethodID, jRconn);
-	const char *sConnClass;
-	if (jConnClass != 0) {
-		sConnClass = connStringToChars(thisenv, jConnClass);
-	}
-
-	// query getSummaryDescription
-	jstring jSummaryDesc = (*thisenv)->CallObjectMethod(thisenv, UpCallsRFFIObject, getSummaryDescMethodID, jRconn);
-	char *sSummaryDesc;
-	if (jSummaryDesc != 0) {
-		sSummaryDesc = connStringToChars(thisenv, jSummaryDesc);
-	}
-
-	// query isSeekable()
-	jboolean seekable = (*thisenv)->CallBooleanMethod(thisenv, UpCallsRFFIObject, isSeekableMethodID, jRconn);
-
-	// query getOpenMode
-	jstring jOpenMode = (*thisenv)->CallObjectMethod(thisenv, UpCallsRFFIObject, getOpenModeMethodID, jRconn);
-	char *sOpenMode;
-	if (jOpenMode != 0) {
-		sOpenMode = connStringToChars(thisenv, jOpenMode);
-	}
-
-	Rconnection new = (Rconnection) malloc(sizeof(struct Rconn));
-	if (!new) {
-		error(_("allocation of file connection failed"));
-	}
-
-	init_con(new, sSummaryDesc, 0, sOpenMode);
-	free(sOpenMode);
-	new->class = (char *) sConnClass;
-	new->canseek = seekable;
-
-	setFd(new, fd);
-
-// TODO implement up-call functions and set them
-//    new->open = &file_open;
-//    new->close = &file_close;
-//    new->vfprintf = &file_vfprintf;
-//    new->fgetc_internal = &file_fgetc_internal;
-//    new->fgetc = &dummy_fgetc;
-//    new->seek = &file_seek;
-//    new->truncate = &file_truncate;
-//    new->fflush = &file_fflush;
-//    new->read = &file_read;
-//    new->write = &file_write;
-
-	return new;
+    if (!inherits(sConn, "connection")) {
+        error(_("invalid connection"));
+    }
+
+    int fd = asInteger(sConn);
+
+    SEXP fastRCon = ((call_R_GetConnection) callbacks[R_GetConnection_x])(fd);
+    char *connClass = ((call_getConnectionClassString) callbacks[getConnectionClassString_x])(fastRCon);
+    char *summaryDesc = ((call_getSummaryDescription) callbacks[getSummaryDescription_x])(fastRCon);
+    char *openMode = ((call_getOpenModeString) callbacks[getOpenModeString_x])(fastRCon);
+    int isSeekable = ((call_isSeekable) callbacks[isSeekable_x])(fastRCon);
+
+    Rconnection new = (Rconnection) malloc(sizeof(struct Rconn));
+    if (!new) {
+        error(_("allocation of file connection failed"));
+    }
+
+    init_con(new, summaryDesc, 0, openMode);
+    free(openMode); // the init_con function makes a copy
+    new->class = connClass;
+    new->canseek = (Rboolean) isSeekable;
+    setFd(new, fd);
+
+    // TODO implement up-call functions and set them
+    // In fact reasonable code should see Rconnection as opaque pointer and does not attempt
+    // at calling these functions directly, but use e.g. R_WriteConnection instead. What is
+    // however important is to update the flags (e.g. opened) according to the current state
+    // on Java side -- i.e. writeLines may temporarily open-close the connection.
+    //    new->open = &file_open;
+    //    new->close = &file_close;
+    //    new->vfprintf = &file_vfprintf;
+    //    new->fgetc_internal = &file_fgetc_internal;
+    //    new->fgetc = &dummy_fgetc;
+    //    new->seek = &file_seek;
+    //    new->truncate = &file_truncate;
+    //    new->fflush = &file_fflush;
+    //    new->read = &file_read;
+    //    new->write = &file_write;
+
+    return new;
 }
 
-#endif
diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/NativeConnections.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/NativeConnections.java
index e6e7baa075..4f3a36f520 100644
--- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/NativeConnections.java
+++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/NativeConnections.java
@@ -41,6 +41,7 @@ import com.oracle.truffle.r.runtime.data.RDoubleVector;
 import com.oracle.truffle.r.runtime.data.RExternalPtr;
 import com.oracle.truffle.r.runtime.data.RIntVector;
 import com.oracle.truffle.r.runtime.data.RLogicalVector;
+import com.oracle.truffle.r.runtime.data.RRawVector;
 import com.oracle.truffle.r.runtime.ffi.CallRFFI;
 import com.oracle.truffle.r.runtime.ffi.DLL;
 import com.oracle.truffle.r.runtime.ffi.DLL.DLLInfo;
@@ -297,6 +298,14 @@ public class NativeConnections {
 
     private static class NativeChannel implements ByteChannel {
 
+        // Note: we wrap the ByteBuffer's array with a raw vector, which is on the native side
+        // converted to a pointer using RAW macro. This turns the raw vector into a native memory
+        // backed vector and any consecutive (write) operations in the native code are actually not
+        // done on the original vector that backs the byte buffer, so we need to copy back the date
+        // to the byte buffer. It would be more efficitent to use direct byte buffer, but then we'd
+        // need to make the native call interface (CallRFFI.InvokeCallRootNode) more flexible so
+        // that it can accept other argument types than SEXPs.
+
         private final NativeRConnection base;
 
         NativeChannel(NativeRConnection base) {
@@ -307,16 +316,19 @@ public class NativeConnections {
         public int read(ByteBuffer dst) throws IOException {
             NativeCallInfo ni = NativeConnections.getNativeFunctionInfo(READ_NATIVE_CONNECTION);
             RootCallTarget nativeCallTarget = CallRFFI.InvokeCallRootNode.create().getCallTarget();
-            Object call = nativeCallTarget.call(ni, new Object[]{base.addr, dst.array(), dst.remaining()});
-
-            if (call instanceof RIntVector) {
-                int nread = ((RIntVector) call).getDataAt(0);
-                // update buffer's position !
-                dst.position(nread);
-                return nread;
+            RRawVector bufferVec = RDataFactory.createRawVector(dst.remaining());
+            Object call = nativeCallTarget.call(ni, new Object[]{base.addr, bufferVec, dst.remaining()});
+            if (!(call instanceof RIntVector)) {
+                throw RInternalError.shouldNotReachHere("unexpected result type from native function, did the signature change?");
             }
-
-            throw RInternalError.shouldNotReachHere("unexpected result type");
+            int nread = ((RIntVector) call).getDataAt(0);
+            if (nread > 0) {
+                // this should also update the buffer position
+                for (int i = 0; i < bufferVec.getLength(); i++) {
+                    dst.put(bufferVec.getRawDataAt(i));
+                }
+            }
+            return nread;
         }
 
         @Override
@@ -327,7 +339,7 @@ public class NativeConnections {
         @Override
         public void close() throws IOException {
             NativeCallInfo ni = NativeConnections.getNativeFunctionInfo(CLOSE_NATIVE_CONNECTION);
-            RootCallTarget nativeCallTarget = CallRFFI.InvokeCallRootNode.create().getCallTarget();
+            RootCallTarget nativeCallTarget = CallRFFI.InvokeVoidCallRootNode.create().getCallTarget();
             nativeCallTarget.call(ni, new Object[]{base.addr});
         }
 
@@ -335,13 +347,15 @@ public class NativeConnections {
         public int write(ByteBuffer src) throws IOException {
             NativeCallInfo ni = NativeConnections.getNativeFunctionInfo(WRITE_NATIVE_CONNECTION);
             RootCallTarget nativeCallTarget = CallRFFI.InvokeCallRootNode.create().getCallTarget();
-
-            Object result = nativeCallTarget.call(ni, new Object[]{base.addr, src.array(), src.remaining()});
-
+            ByteBuffer slice = src;
+            if (src.position() > 0) {
+                slice = src.slice();
+            }
+            RRawVector bufferVec = RDataFactory.createRawVector(slice.array());
+            Object result = nativeCallTarget.call(ni, new Object[]{base.addr, bufferVec, src.remaining()});
             if (result instanceof RIntVector) {
                 return ((RIntVector) result).getDataAt(0);
             }
-
             throw RInternalError.shouldNotReachHere("unexpected result type");
         }
     }
diff --git a/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/R/testrffi.R b/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/R/testrffi.R
index 3a42a04a22..90b45a76cf 100644
--- a/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/R/testrffi.R
+++ b/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/R/testrffi.R
@@ -176,3 +176,15 @@ rffi.captureDotsWithSingleElement <- function(env) {
 rffi.evalAndNativeArrays <- function(vec, expr, env) {
     .Call('test_evalAndNativeArrays', vec, expr, env)
 }
+
+rffi.writeConnection <- function(connection) {
+    .Call('test_writeConnection', connection);
+}
+
+rffi.readConnection <- function(connection) {
+    .Call('test_readConnection', connection);
+}
+
+rffi.createNativeConnection <- function() {
+    .Call('test_createNativeConnection');
+}
\ No newline at end of file
diff --git a/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/src/init.c b/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/src/init.c
index 4323f77fb0..ba9eb2458c 100644
--- a/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/src/init.c
+++ b/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/src/init.c
@@ -77,6 +77,9 @@ static const R_CallMethodDef CallEntries[] = {
         CALLDEF(test_stringNA, 0),
         CALLDEF(test_captureDotsWithSingleElement, 1),
         CALLDEF(test_evalAndNativeArrays, 3),
+        CALLDEF(test_writeConnection, 1),
+        CALLDEF(test_readConnection, 1),
+        CALLDEF(test_createNativeConnection, 0),
         {NULL, NULL, 0}
 };
 
diff --git a/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/src/testrffi.c b/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/src/testrffi.c
index 9f92cbc5ad..92287d4ab1 100644
--- a/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/src/testrffi.c
+++ b/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/src/testrffi.c
@@ -28,6 +28,8 @@
 #include <Rinterface.h>
 #include <Rinternals.h>
 #include <Rinterface.h>
+#include <R_ext/Connections.h>
+#include <string.h>
 #include "testrffi.h"
 
 void dotCModifiedArguments(int* len, int* idata, double* rdata, int* ldata, char** cdata) {
@@ -431,3 +433,91 @@ SEXP test_evalAndNativeArrays(SEXP vec, SEXP expr, SEXP env) {
     UNPROTECT(uprotectCount);
     return vec;
 }
+
+SEXP test_writeConnection(SEXP connVec) {
+	Rconnection connection = R_GetConnection(connVec);
+	char* greeting = "Hello from R_WriteConnection";
+	R_WriteConnection(connection, greeting, strlen(greeting));
+    return R_NilValue;
+}
+
+SEXP test_readConnection(SEXP connVec) {
+    Rconnection connection = R_GetConnection(connVec);
+    unsigned char buffer[255];
+    int size = R_ReadConnection(connection, buffer, 255);
+    SEXP result;
+    PROTECT(result = allocVector(RAWSXP, size));
+    unsigned char* resultData = RAW(result);
+    for (int i = 0; i < size; ++i) {
+        resultData[i] = buffer[i];
+    }
+    UNPROTECT(1);
+    return result;
+}
+
+static Rconnection customConn;
+
+static void printNow(const char* message) {
+    puts(message);
+    fflush(stdout);
+}
+
+static void testrfficonn_destroy(Rconnection conn) {
+    if (conn != customConn) {
+        printNow("ERROR: destroy function did not receive expected argument\n");
+    } else {
+        printNow("Custom connection destroyed\n");
+    }
+}
+
+static Rboolean testrfficonn_open(Rconnection conn) {
+    if (conn != customConn) {
+        printNow("ERROR: open function did not receive expected argument\n");
+        return 0;
+    } else {
+        printNow("Custom connection opened\n");
+        return 1;
+    }
+}
+
+static void testrfficonn_close(Rconnection conn) {
+    if (conn != customConn) {
+        printNow("ERROR: close function did not receive expected argument\n");
+    } else {
+        printNow("Custom connection closed\n");
+    }
+}
+
+static size_t testrfficonn_write(const void * message, size_t size, size_t nitems, Rconnection conn) {
+    if (conn != customConn) {
+        printNow("ERROR: write function did not receive expected argument\n");
+        return 0;
+    } else {
+        printf("Custom connection printing: %.*s\n", (int) (size * nitems), (char*) message);
+        fflush(stdout);
+        return size * nitems;
+    }
+}
+
+static size_t testrfficonn_read(void *buffer, size_t size, size_t niterms, Rconnection conn) {
+    if (conn != customConn) {
+        printNow("ERROR: read function did not receive expected argument\n");
+        return 0;
+    } else if (size * niterms > 0) {
+        ((char *)buffer)[0] = 'Q';
+        return 1;
+    }
+    return 0;
+}
+
+SEXP test_createNativeConnection() {
+    SEXP newConnSEXP = R_new_custom_connection("Connection for testing purposes", "w", "testrfficonn", &customConn);
+    customConn->isopen = 0;
+    customConn->canwrite = 1;
+    customConn->destroy = &testrfficonn_destroy;
+    customConn->open = &testrfficonn_open;
+    customConn->close = &testrfficonn_close;
+    customConn->write = &testrfficonn_write;
+    // customConn->read = &testrfficonn_read; TODO: read test
+    return newConnSEXP;
+}
diff --git a/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/src/testrffi.h b/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/src/testrffi.h
index 6b55ae486c..21f9e40baf 100644
--- a/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/src/testrffi.h
+++ b/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/src/testrffi.h
@@ -97,3 +97,9 @@ extern SEXP test_stringNA(void);
 extern SEXP test_captureDotsWithSingleElement(SEXP env);
 
 extern SEXP test_evalAndNativeArrays(SEXP vec, SEXP expr, SEXP env);
+
+extern SEXP test_writeConnection(SEXP conn);
+
+extern SEXP test_readConnection(SEXP conn);
+
+extern SEXP test_createNativeConnection(void);
diff --git a/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/tests/connections.R b/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/tests/connections.R
new file mode 100644
index 0000000000..c326b6055e
--- /dev/null
+++ b/com.oracle.truffle.r.test.native/packages/testrffi/testrffi/tests/connections.R
@@ -0,0 +1,44 @@
+# Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+#
+# This code is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License version 2 only, as
+# published by the Free Software Foundation.
+#
+# This code is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+# version 2 for more details (a copy is included in the LICENSE file that
+# accompanied this code).
+#
+# You should have received a copy of the GNU General Public License version
+# 2 along with this work; if not, write to the Free Software Foundation,
+# Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+# Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+# or visit www.oracle.com if you need additional information or have any
+# questions.
+
+stopifnot(require(testrffi))
+
+# invoking the R_WriteConnection and R_ReadConnection from native code
+zz <- file("testbinconn", "wb")
+rffi.writeConnection(zz)
+close(zz)
+readLines("testbinconn")
+zz <- file("testbinconn", "rb")
+rffi.readConnection(zz);
+unlink("testbinconn")
+
+# custom connections implemented in native code
+nativeConn <- rffi.createNativeConnection()
+nativeConn
+writeChar("Hello to custom native connections!\n", nativeConn)
+
+# TODO: close ignored -- FastR does not call destroy
+# close(nativeConn)
+
+# TODO: read test is not working
+# FastR reports: Error in readChar(nativeConn2, 42) : invalid connection
+# nativeConn2 <- rffi.createNativeConnection()
+# readChar(nativeConn2, 42)
\ No newline at end of file
-- 
GitLab