From 55edb89ac2e0a033a3ece869c318f77b9d31e3de Mon Sep 17 00:00:00 2001
From: Florian Angerer <florian.angerer@oracle.com>
Date: Mon, 19 Jun 2017 14:30:52 +0200
Subject: [PATCH] Implemented write cache for connections.

---
 .../r/runtime/conn/DelegateRConnection.java   | 48 +++++++++++++++----
 .../runtime/conn/DelegateReadRConnection.java |  2 +-
 .../conn/DelegateReadWriteRConnection.java    |  2 +-
 .../conn/DelegateWriteRConnection.java        | 12 ++++-
 .../r/runtime/conn/FileConnections.java       |  2 +-
 5 files changed, 53 insertions(+), 13 deletions(-)

diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateRConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateRConnection.java
index d43eeb8e78..79911c2ea1 100644
--- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateRConnection.java
+++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateRConnection.java
@@ -54,26 +54,40 @@ import com.oracle.truffle.r.runtime.data.model.RAbstractStringVector;
  * </p>
  */
 abstract class DelegateRConnection implements RConnection, ByteChannel {
-    private static final int DEFAULT_CACHE_SIZE = 16 * 1024;
+    public static final int DEFAULT_CACHE_SIZE = 16 * 1024;
     protected final BaseRConnection base;
     private final ByteBuffer cache;
+    private final boolean readCache;
 
     DelegateRConnection(BaseRConnection base) {
-        this(base, DEFAULT_CACHE_SIZE);
+        this(base, DEFAULT_CACHE_SIZE, true);
     }
 
-    DelegateRConnection(BaseRConnection base, int cacheSize) {
+    DelegateRConnection(BaseRConnection base, int cacheSize, boolean readCache) {
         this.base = Objects.requireNonNull(base);
+        this.readCache = readCache;
 
         if (cacheSize > 0) {
             cache = ByteBuffer.allocate(cacheSize);
-            // indicate that there are no remaining bytes in the buffer
-            cache.flip();
+
+            // indicate that there are no remaining bytes in the buffer to read
+            if (readCache) {
+                cache.flip();
+            }
         } else {
             cache = null;
         }
     }
 
+    private static int transfer(ByteBuffer from, ByteBuffer to) {
+        int nbytes = Math.min(to.remaining(), from.remaining());
+        if (nbytes > 0) {
+            to.put(from.array(), from.arrayOffset() + from.position(), nbytes);
+            from.position(from.position() + nbytes);
+        }
+        return nbytes;
+    }
+
     @Override
     public int getDescriptor() {
         return base.getDescriptor();
@@ -438,7 +452,7 @@ abstract class DelegateRConnection implements RConnection, ByteChannel {
     @Override
     @TruffleBoundary
     public int read(ByteBuffer dst) throws IOException {
-        if (cache != null) {
+        if (readCache && cache != null) {
             final int bytesRequested = dst.remaining();
             int totalBytesRead = 0;
             int bytesToRead = 0;
@@ -459,7 +473,19 @@ abstract class DelegateRConnection implements RConnection, ByteChannel {
     @Override
     @TruffleBoundary
     public int write(ByteBuffer src) throws IOException {
-        return getChannel().write(src);
+        if (!readCache && cache != null) {
+            int total = 0;
+            while (src.hasRemaining()) {
+                total += transfer(src, cache);
+                if (!cache.hasRemaining()) {
+                    flush();
+                }
+            }
+            return total;
+        } else {
+            invalidateCache();
+            return getChannel().write(src);
+        }
     }
 
     /**
@@ -471,7 +497,7 @@ abstract class DelegateRConnection implements RConnection, ByteChannel {
      * </p>
      */
     protected int readInternal() throws IOException {
-        if (cache != null) {
+        if (readCache && cache != null) {
             ensureDataAvailable(1);
             if (!cache.hasRemaining()) {
                 return -1;
@@ -567,7 +593,11 @@ abstract class DelegateRConnection implements RConnection, ByteChannel {
 
     @Override
     public void flush() throws IOException {
-        // nothing to do for channels
+        if (!readCache && cache != null) {
+            cache.flip();
+            getChannel().write(cache);
+            cache.clear();
+        }
     }
 
     @Override
diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadRConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadRConnection.java
index d4b11c2d57..ba4245207d 100644
--- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadRConnection.java
+++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadRConnection.java
@@ -38,7 +38,7 @@ public abstract class DelegateReadRConnection extends DelegateRConnection {
     }
 
     protected DelegateReadRConnection(BaseRConnection base, int cacheSize) {
-        super(base, cacheSize);
+        super(base, cacheSize, true);
     }
 
     @Override
diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadWriteRConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadWriteRConnection.java
index c445251b02..01c2464bd6 100644
--- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadWriteRConnection.java
+++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadWriteRConnection.java
@@ -31,7 +31,7 @@ abstract class DelegateReadWriteRConnection extends DelegateRConnection {
     }
 
     protected DelegateReadWriteRConnection(BaseRConnection base, int cacheSize) {
-        super(base, cacheSize);
+        super(base, cacheSize, true);
     }
 
 }
diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateWriteRConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateWriteRConnection.java
index 16503501a0..f436a0a2f1 100644
--- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateWriteRConnection.java
+++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateWriteRConnection.java
@@ -33,7 +33,11 @@ import com.oracle.truffle.r.runtime.conn.ConnectionSupport.BaseRConnection;
 abstract class DelegateWriteRConnection extends DelegateRConnection {
 
     protected DelegateWriteRConnection(BaseRConnection base) {
-        super(base, 0);
+        super(base, 0, false);
+    }
+
+    protected DelegateWriteRConnection(BaseRConnection base, int cacheSize) {
+        super(base, cacheSize, false);
     }
 
     @Override
@@ -75,4 +79,10 @@ abstract class DelegateWriteRConnection extends DelegateRConnection {
     public boolean canWrite() {
         return true;
     }
+
+    @Override
+    public void close() throws IOException {
+        flush();
+        super.close();
+    }
 }
diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/FileConnections.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/FileConnections.java
index b14529c998..36e4f8e3fc 100644
--- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/FileConnections.java
+++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/FileConnections.java
@@ -324,7 +324,7 @@ public class FileConnections {
         private final FileChannel channel;
 
         FileWriteBinaryConnection(BasePathRConnection base, boolean append) throws IOException {
-            super(base);
+            super(base, 0);
             List<OpenOption> opts = new ArrayList<>();
             opts.add(StandardOpenOption.WRITE);
             opts.add(StandardOpenOption.CREATE);
-- 
GitLab