diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateRConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateRConnection.java index d43eeb8e783e736fc98265c4972b04c6e6cd8f68..79911c2ea1a89f89c83829060c0a7021336f1895 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateRConnection.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateRConnection.java @@ -54,26 +54,40 @@ import com.oracle.truffle.r.runtime.data.model.RAbstractStringVector; * </p> */ abstract class DelegateRConnection implements RConnection, ByteChannel { - private static final int DEFAULT_CACHE_SIZE = 16 * 1024; + public static final int DEFAULT_CACHE_SIZE = 16 * 1024; protected final BaseRConnection base; private final ByteBuffer cache; + private final boolean readCache; DelegateRConnection(BaseRConnection base) { - this(base, DEFAULT_CACHE_SIZE); + this(base, DEFAULT_CACHE_SIZE, true); } - DelegateRConnection(BaseRConnection base, int cacheSize) { + DelegateRConnection(BaseRConnection base, int cacheSize, boolean readCache) { this.base = Objects.requireNonNull(base); + this.readCache = readCache; if (cacheSize > 0) { cache = ByteBuffer.allocate(cacheSize); - // indicate that there are no remaining bytes in the buffer - cache.flip(); + + // indicate that there are no remaining bytes in the buffer to read + if (readCache) { + cache.flip(); + } } else { cache = null; } } + private static int transfer(ByteBuffer from, ByteBuffer to) { + int nbytes = Math.min(to.remaining(), from.remaining()); + if (nbytes > 0) { + to.put(from.array(), from.arrayOffset() + from.position(), nbytes); + from.position(from.position() + nbytes); + } + return nbytes; + } + @Override public int getDescriptor() { return base.getDescriptor(); @@ -438,7 +452,7 @@ abstract class DelegateRConnection implements RConnection, ByteChannel { @Override @TruffleBoundary public int read(ByteBuffer dst) throws IOException { - if (cache != null) { + if (readCache && cache != null) { final int bytesRequested = dst.remaining(); int totalBytesRead = 0; int bytesToRead = 0; @@ -459,7 +473,19 @@ abstract class DelegateRConnection implements RConnection, ByteChannel { @Override @TruffleBoundary public int write(ByteBuffer src) throws IOException { - return getChannel().write(src); + if (!readCache && cache != null) { + int total = 0; + while (src.hasRemaining()) { + total += transfer(src, cache); + if (!cache.hasRemaining()) { + flush(); + } + } + return total; + } else { + invalidateCache(); + return getChannel().write(src); + } } /** @@ -471,7 +497,7 @@ abstract class DelegateRConnection implements RConnection, ByteChannel { * </p> */ protected int readInternal() throws IOException { - if (cache != null) { + if (readCache && cache != null) { ensureDataAvailable(1); if (!cache.hasRemaining()) { return -1; @@ -567,7 +593,11 @@ abstract class DelegateRConnection implements RConnection, ByteChannel { @Override public void flush() throws IOException { - // nothing to do for channels + if (!readCache && cache != null) { + cache.flip(); + getChannel().write(cache); + cache.clear(); + } } @Override diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadRConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadRConnection.java index d4b11c2d5723c037e1839c5f26b05f31b6e98bb5..ba4245207dd183a0c38d76259ca6c078a764431e 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadRConnection.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadRConnection.java @@ -38,7 +38,7 @@ public abstract class DelegateReadRConnection extends DelegateRConnection { } protected DelegateReadRConnection(BaseRConnection base, int cacheSize) { - super(base, cacheSize); + super(base, cacheSize, true); } @Override diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadWriteRConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadWriteRConnection.java index c445251b02495f80207f0c2507bcc046e64eadb9..01c2464bd6523b45e5ddf7ff394780b0c061a387 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadWriteRConnection.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadWriteRConnection.java @@ -31,7 +31,7 @@ abstract class DelegateReadWriteRConnection extends DelegateRConnection { } protected DelegateReadWriteRConnection(BaseRConnection base, int cacheSize) { - super(base, cacheSize); + super(base, cacheSize, true); } } diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateWriteRConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateWriteRConnection.java index 16503501a0914f0e299d5e7c06f1eda99d0d4e88..f436a0a2f12b44520dafa0c0a63caff6467bde1a 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateWriteRConnection.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateWriteRConnection.java @@ -33,7 +33,11 @@ import com.oracle.truffle.r.runtime.conn.ConnectionSupport.BaseRConnection; abstract class DelegateWriteRConnection extends DelegateRConnection { protected DelegateWriteRConnection(BaseRConnection base) { - super(base, 0); + super(base, 0, false); + } + + protected DelegateWriteRConnection(BaseRConnection base, int cacheSize) { + super(base, cacheSize, false); } @Override @@ -75,4 +79,10 @@ abstract class DelegateWriteRConnection extends DelegateRConnection { public boolean canWrite() { return true; } + + @Override + public void close() throws IOException { + flush(); + super.close(); + } } diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/FileConnections.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/FileConnections.java index b14529c998dfd7df4fa202a0df4e432724a99b7c..36e4f8e3fc6c6f19d8ecf941678405190e11214c 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/FileConnections.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/FileConnections.java @@ -324,7 +324,7 @@ public class FileConnections { private final FileChannel channel; FileWriteBinaryConnection(BasePathRConnection base, boolean append) throws IOException { - super(base); + super(base, 0); List<OpenOption> opts = new ArrayList<>(); opts.add(StandardOpenOption.WRITE); opts.add(StandardOpenOption.CREATE);