diff --git a/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/base/BasePackage.java b/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/base/BasePackage.java index 987209e766adde27b169110c9a96055d26585c35..4d3905afb99422e84c908bf76dd7073ac61c706d 100644 --- a/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/base/BasePackage.java +++ b/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/base/BasePackage.java @@ -266,6 +266,7 @@ public class BasePackage extends RBuiltinPackage { add(ConnectionFunctions.ReadChar.class, ConnectionFunctionsFactory.ReadCharNodeGen::create); add(ConnectionFunctions.ReadLines.class, ConnectionFunctionsFactory.ReadLinesNodeGen::create); add(ConnectionFunctions.Seek.class, ConnectionFunctionsFactory.SeekNodeGen::create); + add(ConnectionFunctions.Truncate.class, ConnectionFunctionsFactory.TruncateNodeGen::create); add(ConnectionFunctions.SocketConnection.class, ConnectionFunctionsFactory.SocketConnectionNodeGen::create); add(ConnectionFunctions.RawConnection.class, ConnectionFunctionsFactory.RawConnectionNodeGen::create); add(ConnectionFunctions.RawConnectionValue.class, ConnectionFunctionsFactory.RawConnectionValueNodeGen::create); diff --git a/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/base/ConnectionFunctions.java b/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/base/ConnectionFunctions.java index 30aa8c4efb3bddaa7fb3ba973fa57c835891e778..785b4a0f00c0d35c43f3032cfa1c2a56a2c401ae 100644 --- a/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/base/ConnectionFunctions.java +++ b/com.oracle.truffle.r.nodes.builtin/src/com/oracle/truffle/r/nodes/builtin/base/ConnectionFunctions.java @@ -241,7 +241,7 @@ public abstract class ConnectionFunctions { // ignore and try to open file } catch (IOException e) { RError.warning(RError.SHOW_CALLER, RError.Message.UNABLE_TO_RESOLVE, e.getMessage()); - throw RError.error(RError.SHOW_CALLER, RError.Message.CANNOT_OPEN_CONNECTION); + throw error(RError.Message.CANNOT_OPEN_CONNECTION); } if (path.length() == 0) { @@ -295,7 +295,7 @@ public abstract class ConnectionFunctions { } catch (IOException ex) { throw reportError(description, ex); } catch (IllegalCharsetNameException ex) { - throw RError.error(RError.SHOW_CALLER, RError.Message.UNSUPPORTED_ENCODING_CONVERSION, encoding, ""); + throw error(RError.Message.UNSUPPORTED_ENCODING_CONVERSION, encoding, ""); } } @@ -1258,10 +1258,10 @@ public abstract class ConnectionFunctions { try { return new FifoRConnection(path, open, blocking, encoding).asVector(); } catch (IOException ex) { - RError.warning(this, RError.Message.CANNOT_OPEN_FIFO, path); - throw RError.error(RError.SHOW_CALLER, RError.Message.CANNOT_OPEN_CONNECTION); + warning(RError.Message.CANNOT_OPEN_FIFO, path); + throw error(RError.Message.CANNOT_OPEN_CONNECTION); } catch (IllegalCharsetNameException ex) { - throw RError.error(RError.SHOW_CALLER, RError.Message.UNSUPPORTED_ENCODING_CONVERSION, encoding, ""); + throw error(RError.Message.UNSUPPORTED_ENCODING_CONVERSION, encoding, ""); } } } @@ -1284,10 +1284,10 @@ public abstract class ConnectionFunctions { try { return new PipeRConnection(path, open, encoding).asVector(); } catch (IOException ex) { - RError.warning(this, RError.Message.CANNOT_OPEN_FIFO, path); - throw RError.error(RError.SHOW_CALLER, RError.Message.CANNOT_OPEN_CONNECTION); + warning(RError.Message.CANNOT_OPEN_FIFO, path); + throw error(RError.Message.CANNOT_OPEN_CONNECTION); } catch (IllegalCharsetNameException ex) { - throw RError.error(RError.SHOW_CALLER, RError.Message.UNSUPPORTED_ENCODING_CONVERSION, encoding, ""); + throw error(RError.Message.UNSUPPORTED_ENCODING_CONVERSION, encoding, ""); } } } @@ -1308,4 +1308,25 @@ public abstract class ConnectionFunctions { return RDataFactory.createLogicalVectorFromScalar(res); } } + + @RBuiltin(name = "truncate", kind = INTERNAL, parameterNames = {"con"}, behavior = IO) + public abstract static class Truncate extends RBuiltinNode { + + static { + Casts casts = new Casts(Truncate.class); + CastsHelper.connection(casts); + } + + @Specialization + @TruffleBoundary + protected RNull truncate(int con) { + + try { + RConnection.fromIndex(con).truncate(); + } catch (IOException e) { + throw error(RError.Message.TRUNCATE_UNSUPPORTED_FOR_CONN, e.getMessage()); + } + return RNull.instance; + } + } } diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/RError.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/RError.java index 6e8c5a12079d94ce95fab22b2ba509ba6cdf1054..8a7f66a247258f803b5df49579b82c53cdd83071 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/RError.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/RError.java @@ -841,7 +841,12 @@ public final class RError extends RuntimeException { UNABLE_TO_RESOLVE("unable to resolve '%s'"), LINE_CONTAINS_EMBEDDED_NULLS("line %d appears to contain an embedded nul"), UNSUPPORTED_URL_METHOD("method = \"%s\" is not supported"), - CANNOT_REPLICATE_NULL("cannot replicate NULL to a non-zero length"); + CANNOT_REPLICATE_NULL("cannot replicate NULL to a non-zero length"), + TRUNCATE_ONLY_WRITE_CONNECTION("can only truncate connections open for writing"), + TRUNCATE_ONLY_OPEN_CONN("can only truncate an open connection"), + TRUNCATE_NOT_ENABLED("truncation not enabled for this connection"), + TRUNCATE_UNSUPPORTED_FOR_CONN("cannot truncate connection: %s"), + INCOMPLETE_STRING_AT_EOF_DISCARDED("incomplete string at end of file has been discarded"); public final String message; final boolean hasArgs; diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/ConnectionSupport.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/ConnectionSupport.java index 3e0941dce29b4a1325dbe39163aad99d1a9b039a..c0c2e5bf8e22cd2c7e86528a37017a5d5fd6866b 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/ConnectionSupport.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/ConnectionSupport.java @@ -482,6 +482,11 @@ public class ConnectionSupport { public ByteChannel getChannel() throws IOException { throw RInternalError.shouldNotReachHere("INVALID CONNECTION"); } + + @Override + public void truncate() throws IOException { + throw RInternalError.shouldNotReachHere("INVALID CONNECTION"); + } } /** @@ -760,7 +765,7 @@ public class ConnectionSupport { @Override public ByteChannel getChannel() throws IOException { checkOpen(); - return theConnection.getChannel(); + return theConnection; } @Override @@ -1031,6 +1036,16 @@ public class ConnectionSupport { return seekInternal(offset, seekMode, seekRWMode); } + @Override + public void truncate() throws IOException { + checkOpen(); + if (!closed && opened) { + theConnection.truncate(); + } else { + throw RError.error(RError.SHOW_CALLER, RError.Message.TRUNCATE_ONLY_OPEN_CONN); + } + } + /** * Returns {@code true} iff the last read operation was blocked or there is unflushed * output. diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateRConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateRConnection.java index 0139c2e9a71b7867f954a047e6b0f8acc87d21a4..e31aaa0bd54fb129263d7096a2f5213fd5d5cf15 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateRConnection.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateRConnection.java @@ -24,8 +24,12 @@ package com.oracle.truffle.r.runtime.conn; import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.Reader; import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; @@ -52,12 +56,25 @@ import sun.nio.cs.StreamDecoder; * operations. * </p> */ -abstract class DelegateRConnection implements RConnection { +abstract class DelegateRConnection implements RConnection, ByteChannel { + private static final int DEFAULT_CACHE_SIZE = 16 * 1024; protected final BaseRConnection base; - private StreamDecoder decoder; + private final ByteBuffer cache; DelegateRConnection(BaseRConnection base) { + this(base, DEFAULT_CACHE_SIZE); + } + + DelegateRConnection(BaseRConnection base, int cacheSize) { this.base = Objects.requireNonNull(base); + + if (cacheSize > 0) { + cache = ByteBuffer.allocate(cacheSize); + // indicate that there are no remaining bytes in the buffer + cache.flip(); + } else { + cache = null; + } } @Override @@ -80,12 +97,19 @@ abstract class DelegateRConnection implements RConnection { return base.forceOpen(modeString); } + @SuppressWarnings("unused") + protected long seekInternal(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { + throw RInternalError.shouldNotReachHere("seek has not been implemented for this connection"); + } + @Override - public long seek(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { + public final long seek(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { if (!isSeekable()) { throw RError.error(RError.SHOW_CALLER, RError.Message.NOT_ENABLED_FOR_THIS_CONN, "seek"); } - throw RInternalError.shouldNotReachHere("seek has not been implemented for this connection"); + final long res = seekInternal(offset, seekMode, seekRWMode); + invalidateCache(); + return res; } /** @@ -101,6 +125,7 @@ abstract class DelegateRConnection implements RConnection { base.setIncomplete(false); ArrayList<String> lines = new ArrayList<>(); int totalRead = 0; + int nBytesConsumed = 0; byte[] buffer = new byte[64]; int pushBack = 0; boolean nullRead = false; @@ -110,7 +135,7 @@ abstract class DelegateRConnection implements RConnection { ch = pushBack; pushBack = 0; } else { - ch = getc(); + ch = readInternal(); } boolean lineEnd = false; if (ch < 0) { @@ -120,6 +145,7 @@ abstract class DelegateRConnection implements RConnection { * lines, otherwise keep data and output warning. */ final String incompleteFinalLine = new String(buffer, 0, totalRead, base.getEncoding()); + nBytesConsumed += totalRead; if (!base.isBlocking() && base.isTextMode()) { base.pushBack(RDataFactory.createStringVector(incompleteFinalLine), false); base.setIncomplete(true); @@ -136,7 +162,7 @@ abstract class DelegateRConnection implements RConnection { lineEnd = true; } else if (ch == '\r') { lineEnd = true; - ch = getc(); + ch = readInternal(); if (ch == '\n') { // swallow the trailing lf } else { @@ -150,6 +176,7 @@ abstract class DelegateRConnection implements RConnection { } if (lineEnd) { lines.add(new String(buffer, 0, totalRead, base.getEncoding())); + nBytesConsumed += totalRead; if (n > 0 && lines.size() == n) { break; } @@ -167,9 +194,32 @@ abstract class DelegateRConnection implements RConnection { } String[] result = new String[lines.size()]; lines.toArray(result); + updateReadOffset(nBytesConsumed); return result; } + /** + * Updates the read cursor.<br> + * <p> + * Called by methods using {@link #readInternal()} to tell how many bytes have been consumed to + * be able to update a read curosor if available. + * </p> + * + * @param nBytesConsumed Number of bytes consumed by a read operation. + */ + protected void updateReadOffset(int nBytesConsumed) { + // default: nothing to do + } + + @Override + public String readChar(int nchars, boolean useBytes) throws IOException { + if (useBytes) { + return DelegateRConnection.readCharHelper(nchars, this); + } else { + return DelegateRConnection.readCharHelper(nchars, getDecoder(nchars)); + } + } + /** * Writes a string to a channel. * @@ -233,29 +283,6 @@ abstract class DelegateRConnection implements RConnection { channel.write(buf); } - /** - * Reads null-terminated character strings from a {@link ReadableByteChannel}. - */ - public static byte[] readBinCharsHelper(ReadableByteChannel channel) throws IOException { - ByteBuffer buf = ByteBuffer.allocate(1); - int numRead = channel.read(buf); - if (numRead <= 0) { - return null; - } - int totalRead = 0; - byte[] buffer = new byte[64]; - while (true) { - buffer = DelegateRConnection.checkBuffer(buffer, totalRead); - buffer[totalRead++] = (byte) (buf.get() & 0xFF); - if (numRead == 0) { - break; - } - buf.clear(); - numRead = channel.read(buf); - } - return buffer; - } - /** * Reads a specified amount of characters. * @@ -329,7 +356,10 @@ abstract class DelegateRConnection implements RConnection { } /** - * Implements standard seeking behavior. + * Implements standard seeking behavior.<br> + * <p> + * <it>Standard</it> means that there is a shared cursor between reading and writing operations. + * </p> */ public static long seek(SeekableByteChannel channel, long offset, SeekMode seekMode, @SuppressWarnings("unused") SeekRWMode seekRWMode) throws IOException { long position = channel.position(); @@ -365,13 +395,27 @@ abstract class DelegateRConnection implements RConnection { } public static boolean writeLinesHelper(WritableByteChannel out, RAbstractStringVector lines, String sep, Charset encoding) throws IOException { - boolean incomplete = false; - for (int i = 0; i < lines.getLength(); i++) { - final String line = lines.getDataAt(i); - incomplete = DelegateRConnection.writeStringHelper(out, line, false, encoding); - incomplete = DelegateRConnection.writeStringHelper(out, sep, false, encoding) || incomplete; + if (sep != null && sep.contains("\n")) { + // fast path: we know that the line is complete + final ByteBuffer nlBuf = ByteBuffer.wrap(sep.getBytes(encoding)); + for (int i = 0; i < lines.getLength(); i++) { + final String line = lines.getDataAt(i); + final ByteBuffer buf = ByteBuffer.wrap(line.getBytes(encoding)); + out.write(buf); + nlBuf.rewind(); + out.write(nlBuf); + } + return false; + } else { + // slow path: we have to scan every string if it contains a newline + boolean incomplete = false; + for (int i = 0; i < lines.getLength(); i++) { + final String line = lines.getDataAt(i); + incomplete = DelegateRConnection.writeStringHelper(out, line, false, encoding); + incomplete = DelegateRConnection.writeStringHelper(out, sep, false, encoding) || incomplete; + } + return incomplete; } - return incomplete; } @Override @@ -382,11 +426,190 @@ abstract class DelegateRConnection implements RConnection { /** * Creates the stream decoder on demand and returns it. */ - protected StreamDecoder getDecoder() throws IOException { - if (decoder == null) { - CharsetDecoder charsetEncoder = base.getEncoding().newDecoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE); - decoder = StreamDecoder.forDecoder(getChannel(), charsetEncoder, -1); + protected StreamDecoder getDecoder(int bufSize) { + CharsetDecoder charsetEncoder = base.getEncoding().newDecoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE); + return StreamDecoder.forDecoder(this, charsetEncoder, bufSize); + } + + @Override + public void truncate() throws IOException { + if (!isSeekable()) { + throw RError.error(RError.SHOW_CALLER, RError.Message.TRUNCATE_NOT_ENABLED); + } + throw RError.nyi(RError.SHOW_CALLER, "truncate"); + } + + @Override + public void writeBin(ByteBuffer buffer) throws IOException { + write(buffer); + } + + @Override + public void writeChar(String s, int pad, String eos, boolean useBytes) throws IOException { + DelegateRConnection.writeCharHelper(this, s, pad, eos); + } + + @Override + public void writeLines(RAbstractStringVector lines, String sep, boolean useBytes) throws IOException { + boolean incomplete = DelegateRConnection.writeLinesHelper(this, lines, sep, base.getEncoding()); + base.setIncomplete(incomplete); + } + + @Override + public void writeString(String s, boolean nl) throws IOException { + DelegateRConnection.writeStringHelper(this, s, nl, base.getEncoding()); + } + + @Override + public int read(ByteBuffer dst) throws IOException { + if (cache != null) { + final int bytesRequested = dst.remaining(); + int totalBytesRead = 0; + int bytesToRead = 0; + boolean eof; + do { + eof = ensureDataAvailable(dst.remaining()); + bytesToRead = Math.min(cache.remaining(), dst.remaining()); + cache.get(dst.array(), dst.position(), bytesToRead); + dst.position(dst.position() + bytesToRead); + totalBytesRead += bytesToRead; + } while (totalBytesRead < bytesRequested && bytesToRead > 0 && !eof); + return totalBytesRead == 0 && eof ? -1 : totalBytesRead; + } else { + return getChannel().read(dst); + } + } + + @Override + public int write(ByteBuffer src) throws IOException { + return getChannel().write(src); + } + + /** + * Reads one byte from the channel.<br> + * <p> + * Should basically do the same job as {@link #getc()} but is only used by internally by this + * class or subclasses an may therefore produce an inconsistent state over several calls. For + * example, updating the channel's cursor position can be collapsed. + * </p> + */ + protected int readInternal() throws IOException { + if (cache != null) { + ensureDataAvailable(1); + if (!cache.hasRemaining()) { + return -1; + } + // consider byte to be unsigned + return cache.get() & 0xFF; + } else { + + ByteBuffer buf = ByteBuffer.allocate(1); + int n = getChannel().read(buf); + if (n <= 0) { + return -1; + } + buf.flip(); + return buf.get() & 0xFF; + } + } + + private boolean ensureDataAvailable(int i) throws IOException { + assert cache != null; + if (cache.remaining() < i) { + byte[] rem = new byte[cache.remaining()]; + cache.get(rem); + assert !cache.hasRemaining(); + cache.clear(); + cache.put(rem); + int read = getChannel().read(cache); + cache.flip(); + return read == -1; + } + return false; + } + + /** + * Invalidates the read cache by dropping cached data.<br> + * <p> + * This method is most useful if an operation like {@code seek} is performed that destroys the + * order data is read. + * </p> + */ + protected void invalidateCache() { + if (cache != null) { + cache.clear(); + cache.flip(); } - return decoder; + } + + @Override + public int getc() throws IOException { + return readInternal(); + } + + @Override + public int readBin(ByteBuffer buffer) throws IOException { + return read(buffer); + } + + /** + * Reads null-terminated character strings from a {@link ReadableByteChannel}. + */ + @Override + public byte[] readBinChars() throws IOException { + int numRead = readInternal(); + if (numRead <= 0) { + return null; + } + int totalRead = 0; + byte[] buffer = new byte[64]; + while (true) { + buffer = DelegateRConnection.checkBuffer(buffer, totalRead); + buffer[totalRead++] = (byte) numRead; + if (numRead == 0) { + break; + } else if (numRead == -1) { + RError.warning(RError.SHOW_CALLER, RError.Message.INCOMPLETE_STRING_AT_EOF_DISCARDED); + return null; + } + numRead = readInternal(); + } + return buffer; + } + + @Override + public boolean canRead() { + return true; + } + + @Override + public boolean canWrite() { + return true; + } + + @Override + public void flush() throws IOException { + // nothing to do for channels + } + + @Override + public OutputStream getOutputStream() throws IOException { + return Channels.newOutputStream(this); + } + + @Override + public InputStream getInputStream() throws IOException { + return Channels.newInputStream(this); + } + + @Override + public void close() throws IOException { + getChannel().close(); + } + + @Override + public void closeAndDestroy() throws IOException { + base.closed = true; + close(); } } diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadRConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadRConnection.java index 70a2bac8df5e6cc84b12f1b876f627b4d56a0b26..d4b11c2d5723c037e1839c5f26b05f31b6e98bb5 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadRConnection.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadRConnection.java @@ -23,10 +23,8 @@ package com.oracle.truffle.r.runtime.conn; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; -import java.nio.channels.Channels; import com.oracle.truffle.r.runtime.RError; import com.oracle.truffle.r.runtime.RInternalError; @@ -35,12 +33,14 @@ import com.oracle.truffle.r.runtime.data.model.RAbstractStringVector; public abstract class DelegateReadRConnection extends DelegateRConnection { - private final ByteBuffer tmp = ByteBuffer.allocate(1); - protected DelegateReadRConnection(BaseRConnection base) { super(base); } + protected DelegateReadRConnection(BaseRConnection base, int cacheSize) { + super(base, cacheSize); + } + @Override public void writeLines(RAbstractStringVector lines, String sep, boolean useBytes) throws IOException { throw new IOException(RError.Message.CANNOT_WRITE_CONNECTION.message); @@ -61,42 +61,6 @@ public abstract class DelegateReadRConnection extends DelegateRConnection { throw new IOException(RError.Message.CANNOT_WRITE_CONNECTION.message); } - @Override - public int getc() throws IOException { - if (isTextMode()) { - return getDecoder().read(); - } else { - tmp.clear(); - int nread = getChannel().read(tmp); - tmp.rewind(); - return nread > 0 ? tmp.get() : -1; - } - } - - @Override - public String readChar(int nchars, boolean useBytes) throws IOException { - if (useBytes) { - return DelegateRConnection.readCharHelper(nchars, getChannel()); - } else { - return DelegateRConnection.readCharHelper(nchars, getDecoder()); - } - } - - @Override - public int readBin(ByteBuffer buffer) throws IOException { - return getChannel().read(buffer); - } - - @Override - public byte[] readBinChars() throws IOException { - return DelegateRConnection.readBinCharsHelper(getChannel()); - } - - @Override - public void flush() { - // nothing to do when reading - } - @Override public OutputStream getOutputStream() { throw RInternalError.shouldNotReachHere(); @@ -113,19 +77,8 @@ public abstract class DelegateReadRConnection extends DelegateRConnection { } @Override - public InputStream getInputStream() throws IOException { - return Channels.newInputStream(getChannel()); - } - - @Override - public void close() throws IOException { - getChannel().close(); - } - - @Override - public void closeAndDestroy() throws IOException { - base.closed = true; - close(); + public void truncate() { + throw RError.error(RError.SHOW_CALLER, RError.Message.TRUNCATE_ONLY_WRITE_CONNECTION); } } diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadWriteRConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadWriteRConnection.java index 011cf49d1b9256813e2f4151b61cf8565dd1f695..c445251b02495f80207f0c2507bcc046e64eadb9 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadWriteRConnection.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadWriteRConnection.java @@ -22,105 +22,16 @@ */ package com.oracle.truffle.r.runtime.conn; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; - import com.oracle.truffle.r.runtime.conn.ConnectionSupport.BaseRConnection; -import com.oracle.truffle.r.runtime.data.model.RAbstractStringVector; abstract class DelegateReadWriteRConnection extends DelegateRConnection { - private final ByteBuffer tmp = ByteBuffer.allocate(1); - protected DelegateReadWriteRConnection(BaseRConnection base) { super(base); } - @Override - public boolean canRead() { - return true; - } - - @Override - public boolean canWrite() { - return true; - } - - @Override - public int getc() throws IOException { - tmp.clear(); - int nread = getChannel().read(tmp); - tmp.rewind(); - return nread > 0 ? tmp.get() : -1; - } - - @Override - public String readChar(int nchars, boolean useBytes) throws IOException { - if (useBytes) { - return DelegateRConnection.readCharHelper(nchars, getChannel()); - } else { - return DelegateRConnection.readCharHelper(nchars, getDecoder()); - } - } - - @Override - public int readBin(ByteBuffer buffer) throws IOException { - return getChannel().read(buffer); - } - - @Override - public byte[] readBinChars() throws IOException { - return DelegateRConnection.readBinCharsHelper(getChannel()); - } - - @Override - public void flush() { - // nothing to do for channels - } - - @Override - public OutputStream getOutputStream() throws IOException { - return Channels.newOutputStream(getChannel()); - } - - @Override - public InputStream getInputStream() throws IOException { - return Channels.newInputStream(getChannel()); - } - - @Override - public void close() throws IOException { - getChannel().close(); - } - - @Override - public void closeAndDestroy() throws IOException { - base.closed = true; - close(); - } - - @Override - public void writeBin(ByteBuffer buffer) throws IOException { - getChannel().write(buffer); - } - - @Override - public void writeChar(String s, int pad, String eos, boolean useBytes) throws IOException { - DelegateRConnection.writeCharHelper(getChannel(), s, pad, eos); - } - - @Override - public void writeLines(RAbstractStringVector lines, String sep, boolean useBytes) throws IOException { - boolean incomplete = DelegateRConnection.writeLinesHelper(getChannel(), lines, sep, base.getEncoding()); - base.setIncomplete(incomplete); - } - - @Override - public void writeString(String s, boolean nl) throws IOException { - DelegateRConnection.writeStringHelper(getChannel(), s, nl, base.getEncoding()); + protected DelegateReadWriteRConnection(BaseRConnection base, int cacheSize) { + super(base, cacheSize); } } diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateWriteRConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateWriteRConnection.java index 4d4520d88cfea5b2ce808e579e45c8f4991fe4b5..16503501a0914f0e299d5e7c06f1eda99d0d4e88 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateWriteRConnection.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateWriteRConnection.java @@ -24,19 +24,16 @@ package com.oracle.truffle.r.runtime.conn; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.nio.ByteBuffer; -import java.nio.channels.Channels; import com.oracle.truffle.r.runtime.RError; import com.oracle.truffle.r.runtime.RInternalError; import com.oracle.truffle.r.runtime.conn.ConnectionSupport.BaseRConnection; -import com.oracle.truffle.r.runtime.data.model.RAbstractStringVector; abstract class DelegateWriteRConnection extends DelegateRConnection { protected DelegateWriteRConnection(BaseRConnection base) { - super(base); + super(base, 0); } @Override @@ -78,48 +75,4 @@ abstract class DelegateWriteRConnection extends DelegateRConnection { public boolean canWrite() { return true; } - - @Override - public void closeAndDestroy() throws IOException { - base.closed = true; - close(); - } - - @Override - public void flush() throws IOException { - // channels don't need any flushing - } - - @Override - public void close() throws IOException { - flush(); - getChannel().close(); - } - - @Override - public OutputStream getOutputStream() throws IOException { - return Channels.newOutputStream(getChannel()); - } - - @Override - public void writeBin(ByteBuffer buffer) throws IOException { - getChannel().write(buffer); - } - - @Override - public void writeChar(String s, int pad, String eos, boolean useBytes) throws IOException { - DelegateRConnection.writeCharHelper(getChannel(), s, pad, eos); - } - - @Override - public void writeLines(RAbstractStringVector lines, String sep, boolean useBytes) throws IOException { - boolean incomplete = DelegateRConnection.writeLinesHelper(getChannel(), lines, sep, base.getEncoding()); - base.setIncomplete(incomplete); - } - - @Override - public void writeString(String s, boolean nl) throws IOException { - DelegateRConnection.writeStringHelper(getChannel(), s, nl, base.getEncoding()); - } - } diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/FileConnections.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/FileConnections.java index a72e817ec18f761e35c1fc294f02669dc40240b2..1f009cd7a92008c6921c855867e791d3dab22540 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/FileConnections.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/FileConnections.java @@ -54,7 +54,6 @@ import com.oracle.truffle.r.runtime.TempPathName; import com.oracle.truffle.r.runtime.conn.ConnectionSupport.AbstractOpenMode; import com.oracle.truffle.r.runtime.conn.ConnectionSupport.BasePathRConnection; import com.oracle.truffle.r.runtime.conn.ConnectionSupport.ConnectionClass; -import com.oracle.truffle.r.runtime.conn.ConnectionSupport.OpenMode; import com.oracle.truffle.r.runtime.data.model.RAbstractStringVector; public class FileConnections { @@ -140,8 +139,16 @@ public class FileConnections { delegate = new FileWriteBinaryConnection(base, false); break; case ReadWriteTrunc: + delegate = new FileReadWriteTextConnection(base, false); + break; case ReadWriteTruncBinary: - delegate = new FileReadWriteConnection(base); + delegate = new FileReadWriteBinaryConnection(base, false); + break; + case ReadAppend: + delegate = new FileReadWriteTextConnection(base, true); + break; + case ReadAppendBinary: + delegate = new FileReadWriteBinaryConnection(base, true); break; default: throw RError.nyi(RError.SHOW_CALLER2, "open mode: " + base.getOpenMode()); @@ -260,6 +267,11 @@ public class FileConnections { return true; } + @Override + public long seekInternal(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { + return DelegateRConnection.seek(channel, offset, seekMode, seekRWMode); + } + @Override public ByteChannel getChannel() { return channel; @@ -311,7 +323,64 @@ public class FileConnections { opts.add(StandardOpenOption.TRUNCATE_EXISTING); } channel = FileChannel.open(Paths.get(base.path), opts.toArray(new OpenOption[opts.size()])); + } + + @Override + public boolean isSeekable() { + return true; + } + + @Override + protected long seekInternal(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { + return DelegateRConnection.seek(channel, offset, seekMode, seekRWMode); + } + + @Override + public ByteChannel getChannel() { + return channel; + } + + @Override + public void truncate() throws IOException { + channel.truncate(channel.position()); + } + + } + + private static class FileReadWriteTextConnection extends DelegateReadWriteRConnection { + private final FileChannel channel; + private long readOffset; + private long writeOffset; + private SeekRWMode lastMode = SeekRWMode.READ; + + FileReadWriteTextConnection(BasePathRConnection base, boolean append) throws IOException { + super(base); + List<OpenOption> opts = new ArrayList<>(); + opts.add(StandardOpenOption.READ); + opts.add(StandardOpenOption.WRITE); + opts.add(StandardOpenOption.CREATE); + channel = FileChannel.open(Paths.get(base.path), opts.toArray(new OpenOption[opts.size()])); + if (append) { + writeOffset = channel.size(); + } else { + channel.truncate(0); + } + } + + @Override + public int getc() throws IOException { + setReadPosition(); + final int value = super.readInternal(); + if (value != -1) { + readOffset++; + } + return value; + } + + @Override + protected void updateReadOffset(int nBytesConsumed) { + readOffset += nBytesConsumed; } @Override @@ -319,14 +388,136 @@ public class FileConnections { return true; } + @Override + protected long seekInternal(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { + long result; + boolean set = true; + switch (seekMode) { + case ENQUIRE: + set = false; + break; + case START: + break; + default: + throw RError.nyi(RError.SHOW_CALLER, "seek mode"); + } + switch (seekRWMode) { + case LAST: + if (lastMode == SeekRWMode.READ) { + result = readOffset; + if (set) { + readOffset = offset; + } + } else { + result = writeOffset; + if (set) { + writeOffset = offset; + } + } + break; + case READ: + result = readOffset; + if (set) { + readOffset = offset; + } + break; + case WRITE: + result = writeOffset; + if (set) { + writeOffset = offset; + } + break; + default: + throw RError.nyi(RError.SHOW_CALLER, "seek mode"); + } + return result; + } + + @Override + public String[] readLines(int n, boolean warn, boolean skipNul) throws IOException { + setReadPosition(); + return super.readLines(n, warn, skipNul); + } + + @Override + public int readBin(ByteBuffer buffer) throws IOException { + setReadPosition(); + return super.readBin(buffer); + } + + @Override + public String readChar(int nchars, boolean useBytes) throws IOException { + setReadPosition(); + return super.readChar(nchars, useBytes); + } + + @Override + public byte[] readBinChars() throws IOException { + setReadPosition(); + return super.readBinChars(); + } + + @Override + public void close() throws IOException { + channel.close(); + } + + @Override + public void writeLines(RAbstractStringVector lines, String sep, boolean useBytes) throws IOException { + setWritePosition(); + super.writeLines(lines, sep, useBytes); + writeOffset = channel.position(); + } + + @Override + public void writeBin(ByteBuffer buffer) throws IOException { + setWritePosition(); + super.writeBin(buffer); + } + + @Override + public void writeChar(String s, int pad, String eos, boolean useBytes) throws IOException { + setWritePosition(); + super.writeChar(s, pad, eos, useBytes); + } + + @Override + public void writeString(String s, boolean nl) throws IOException { + setWritePosition(); + super.writeString(s, nl); + } + @Override public ByteChannel getChannel() { return channel; } + private void setReadPosition() throws IOException { + if (lastMode != SeekRWMode.READ) { + channel.position(readOffset); + lastMode = SeekRWMode.READ; + } + } + + private void setWritePosition() throws IOException { + if (lastMode != SeekRWMode.WRITE) { + channel.position(writeOffset); + lastMode = SeekRWMode.WRITE; + } + } + + @Override + public void truncate() throws IOException { + channel.truncate(writeOffset); + lastMode = SeekRWMode.WRITE; + // GnuR also freshly queries the file pointer. It may happen that the file pointer is + // different as expected. + writeOffset = channel.position(); + } + } - private static class FileReadWriteConnection extends DelegateReadWriteRConnection { + private static class FileReadWriteBinaryConnection extends DelegateReadWriteRConnection { /* * This is a minimal implementation to support one specific use in package installation. * @@ -339,38 +530,38 @@ public class FileConnections { private long writeOffset; private SeekRWMode lastMode = SeekRWMode.READ; - FileReadWriteConnection(BasePathRConnection base) throws IOException { + FileReadWriteBinaryConnection(BasePathRConnection base, boolean append) throws IOException { super(base); - OpenMode openMode = base.getOpenMode(); - String rafMode = null; - switch (openMode.abstractOpenMode) { - case ReadWriteTrunc: - case ReadWriteTruncBinary: - rafMode = "rw"; - break; - default: - throw RInternalError.shouldNotReachHere(); + raf = new RandomAccessFile(base.path, "rw"); + if (append) { + writeOffset = raf.length(); + } else { + raf.setLength(0); } - raf = new RandomAccessFile(base.path, rafMode); } @Override public int getc() throws IOException { - raf.seek(readOffset); - int value = raf.read(); + setReadPosition(); + final int value = raf.read(); if (value != -1) { readOffset++; } return value; } + @Override + protected void updateReadOffset(int nBytesConsumed) { + readOffset += nBytesConsumed; + } + @Override public boolean isSeekable() { return true; } @Override - public long seek(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { + protected long seekInternal(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { long result = raf.getFilePointer(); switch (seekMode) { case ENQUIRE: @@ -400,10 +591,42 @@ public class FileConnections { @Override public String[] readLines(int n, boolean warn, boolean skipNul) throws IOException { - raf.seek(readOffset); + setReadPosition(); return super.readLines(n, warn, skipNul); } + @Override + public int readBin(ByteBuffer buffer) throws IOException { + setReadPosition(); + return super.readBin(buffer); + } + + @Override + public String readChar(int nchars, boolean useBytes) throws IOException { + setReadPosition(); + return super.readChar(nchars, useBytes); + } + + @Override + public byte[] readBinChars() throws IOException { + setReadPosition(); + return super.readBinChars(); + } + + private void setReadPosition() throws IOException { + if (lastMode != SeekRWMode.READ) { + raf.seek(readOffset); + lastMode = SeekRWMode.READ; + } + } + + private void setWritePosition() throws IOException { + if (lastMode != SeekRWMode.WRITE) { + raf.seek(writeOffset); + lastMode = SeekRWMode.WRITE; + } + } + @Override public void close() throws IOException { raf.close(); @@ -411,22 +634,27 @@ public class FileConnections { @Override public void writeLines(RAbstractStringVector lines, String sep, boolean useBytes) throws IOException { - raf.seek(writeOffset); - byte[] sepData = sep.getBytes(base.getEncoding()); - for (int i = 0; i < lines.getLength(); i++) { - writeString(lines.getDataAt(i), false); - raf.write(sepData); - } + setWritePosition(); + super.writeLines(lines, sep, useBytes); writeOffset = raf.getFilePointer(); - lastMode = SeekRWMode.WRITE; + } + + @Override + public void writeBin(ByteBuffer buffer) throws IOException { + setWritePosition(); + super.writeBin(buffer); + } + + @Override + public void writeChar(String s, int pad, String eos, boolean useBytes) throws IOException { + setWritePosition(); + super.writeChar(s, pad, eos, useBytes); } @Override public void writeString(String s, boolean nl) throws IOException { - raf.write(s.getBytes(base.getEncoding())); - if (nl) { - raf.write(System.lineSeparator().getBytes(base.getEncoding())); - } + setWritePosition(); + super.writeString(s, nl); } @Override @@ -434,6 +662,15 @@ public class FileConnections { return raf.getChannel(); } + @Override + public void truncate() throws IOException { + raf.setLength(writeOffset); + lastMode = SeekRWMode.WRITE; + // GnuR also freshly queries the file pointer. It may happen that the file pointer is + // different as expected. + writeOffset = raf.getFilePointer(); + } + } private static class CompressedInputRConnection extends DelegateReadRConnection { @@ -479,7 +716,7 @@ public class FileConnections { } @Override - public long seek(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { + protected long seekInternal(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { if (seekable) { // TODO GZIP is basically seekable; however, the output stream does not allow any // seeking @@ -499,6 +736,11 @@ public class FileConnections { public ByteChannel getChannel() { return channel; } + + @Override + public void truncate() throws IOException { + throw RError.nyi(RError.SHOW_CALLER, "truncating compressed file not"); + } } private static class BZip2OutputRConnection extends CompressedOutputRConnection { diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/RConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/RConnection.java index 341d12e3ba8fbdd49f07a197ab28d790f74acbb1..1314faf88542c234a282bf6b2c961586f455ea08 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/RConnection.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/RConnection.java @@ -139,7 +139,10 @@ public interface RConnection extends AutoCloseable { long seek(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException; /** - * Internal support for reading one character at a time. + * Internal support for reading one byte at a time.<br> + * <p> + * <b>NOTE:</b> This method is also used from native code. Do not change the signature! + * </p> */ int getc() throws IOException; @@ -167,7 +170,7 @@ public interface RConnection extends AutoCloseable { * @param s string to output * @param pad number of (zero) pad bytes * @param eos string to append to s - * @param useBytes TODO + * @param useBytes */ void writeChar(String s, int pad, String eos, boolean useBytes) throws IOException; @@ -217,5 +220,10 @@ public interface RConnection extends AutoCloseable { */ boolean isOpen(); + /** + * Truncates the connection (if possible). + */ + void truncate() throws IOException; + void pushBack(RAbstractStringVector lines, boolean addNewLine); } diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/RawConnections.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/RawConnections.java index 3213cfc97fe70ac6131796149f234e9f0b46c4ad..3abb2e1bca5d47ddbe3ca820d2d6bd4952a22cde 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/RawConnections.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/RawConnections.java @@ -118,7 +118,7 @@ public class RawConnections { private SeekableMemoryByteChannel channel; RawReadRConnection(BaseRConnection base, SeekableMemoryByteChannel channel) { - super(base); + super(base, 0); this.channel = channel; try { channel.position(0L); @@ -148,7 +148,7 @@ public class RawConnections { } @Override - public long seek(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { + public long seekInternal(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { return RawRConnection.seek(channel, offset, seekMode, seekRWMode); } @@ -179,7 +179,7 @@ public class RawConnections { } @Override - public long seek(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { + protected long seekInternal(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { return RawRConnection.seek(channel, offset, seekMode, seekRWMode); } @@ -192,6 +192,11 @@ public class RawConnections { public boolean isSeekable() { return true; } + + @Override + public void truncate() throws IOException { + channel.truncate(channel.position()); + } } private static class RawReadWriteConnection extends DelegateReadWriteRConnection { @@ -199,7 +204,7 @@ public class RawConnections { private final SeekableMemoryByteChannel channel; protected RawReadWriteConnection(BaseRConnection base, SeekableMemoryByteChannel channel, boolean append) { - super(base); + super(base, 0); this.channel = channel; if (append) { try { @@ -211,7 +216,7 @@ public class RawConnections { } @Override - public long seek(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { + protected long seekInternal(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { return RawRConnection.seek(channel, offset, seekMode, seekRWMode); } @@ -225,6 +230,10 @@ public class RawConnections { return true; } + @Override + public void truncate() throws IOException { + channel.truncate(channel.position()); + } } } diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/SeekableMemoryByteChannel.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/SeekableMemoryByteChannel.java index c625c855a30a84a941f19ceed6f05d377e14fe6f..256b4e4a99fa60e7db6cdf5f274bd5e1eca561b5 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/SeekableMemoryByteChannel.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/SeekableMemoryByteChannel.java @@ -200,12 +200,27 @@ public class SeekableMemoryByteChannel implements SeekableByteChannel { @Override public SeekableByteChannel truncate(long size) throws IOException { + if (size < 0) { + throw new IllegalArgumentException("'size' cannot be negative"); + } + // avoid security leak by nulling previous data - Arrays.fill(buf, (byte) 0); + final int from; + final int to; + if (size < endPos) { + from = (int) (size - offset); + to = (int) (endPos - offset); + } else { + to = (int) (size - offset); + from = (int) (endPos - offset); + + // need to enlarge buffer + ensureCapacity(to); + } + Arrays.fill(buf, from, to, (byte) 0); - offset = 0; - position = 0; - endPos = 0; + position = Math.min(position, size); + endPos = size; return this; } diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/SocketConnections.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/SocketConnections.java index 2f978e6fa943aabad0c82ecf873bfd52a925a407..a4ea565e9ed3f72c8a7451470a38b94a1bcf78f5 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/SocketConnections.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/SocketConnections.java @@ -82,7 +82,7 @@ public class SocketConnections { protected final RSocketConnection thisBase; protected RSocketReadWriteConnection(RSocketConnection base) { - super(base); + super(base, 0); this.thisBase = base; } @@ -118,7 +118,7 @@ public class SocketConnections { private SocketChannel socketChannel; protected RSocketReadWriteNonBlockConnection(RSocketConnection base) { - super(base); + super(base, 0); } protected void openStreams(Socket socketArg) throws IOException { diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/TextConnections.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/TextConnections.java index e789830946419241102ac19c33223ddb19c7cb50..b1e4629053bf14d0dc17abbeb8d257b162d0ffef 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/TextConnections.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/TextConnections.java @@ -92,7 +92,7 @@ public class TextConnections { private int index; TextReadRConnection(TextRConnection base, RAbstractStringVector object) { - super(base); + super(base, 0); assert object != null; StringBuffer sb = new StringBuffer(); for (int i = 0; i < object.getLength(); i++) { @@ -314,7 +314,7 @@ public class TextConnections { } @Override - public long seek(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { + protected long seekInternal(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { throw RError.error(RError.SHOW_CALLER, RError.Message.SEEK_NOT_RELEVANT_FOR_TEXT_CON); } diff --git a/com.oracle.truffle.r.test/src/com/oracle/truffle/r/test/ExpectedTestOutput.test b/com.oracle.truffle.r.test/src/com/oracle/truffle/r/test/ExpectedTestOutput.test index e1ec10e788bfcd9b6a52f7be5bc44255d27021af..319431438de984e29e13ed3614c32b17a6f564f8 100644 --- a/com.oracle.truffle.r.test/src/com/oracle/truffle/r/test/ExpectedTestOutput.test +++ b/com.oracle.truffle.r.test/src/com/oracle/truffle/r/test/ExpectedTestOutput.test @@ -73348,6 +73348,13 @@ $`can write` [1] "no" +##com.oracle.truffle.r.test.library.base.TestConnections.testFileWriteReadBin# +#{ cat('abc', file = 'tmptest/com.oracle.truffle.r.test.library.base.conn/wb3'); readBin(file('tmptest/com.oracle.truffle.r.test.library.base.conn/wb3', 'rb'), character(), 2) } +character(0) +Warning message: +In readBin(file("tmptest/com.oracle.truffle.r.test.library.base.conn/wb3", : + incomplete string at end of file has been discarded + ##com.oracle.truffle.r.test.library.base.TestConnections.testFileWriteReadBin# #{ readBin(file("tmptest/com.oracle.truffle.r.test.library.base.conn/wb1", "rb"), 3) } numeric(0) @@ -73355,6 +73362,10 @@ numeric(0) ##com.oracle.truffle.r.test.library.base.TestConnections.testFileWriteReadBin# #{ writeBin("abc", file("tmptest/com.oracle.truffle.r.test.library.base.conn/wb1", open="wb")) } +##com.oracle.truffle.r.test.library.base.TestConnections.testFileWriteReadBin# +#{ zz <- file("tmptest/com.oracle.truffle.r.test.library.base.conn/wb2", open="wb"); writeChar("abc", zz); close(zz); readBin(file("tmptest/com.oracle.truffle.r.test.library.base.conn/wb2", "rb"), character(), 4) } +[1] "abc" + ##com.oracle.truffle.r.test.library.base.TestConnections.testFileWriteReadChar# #{ readChar(file("tmptest/com.oracle.truffle.r.test.library.base.conn/wc1"), 3) } [1] "abc" @@ -73717,6 +73728,48 @@ Error in textConnection(NULL, "r") : invalid 'text' argument #{ con <- textConnection(c("1", "2", "3","4")); readLines(con, 2); readLines(con, 2); readLines(con, 2) } character(0) +##com.oracle.truffle.r.test.library.base.TestConnections.testTruncate# +#fn <- '__tmp_77253842367367'; zz <- file(fn, 'w'); writeLines(c('Hello', 'wonderful', 'World'), zz); seek(zz, 0); truncate(zz); close(zz); readLines(file(fn)); unlink(fn) +[1] 22 +NULL +character(0) + +##com.oracle.truffle.r.test.library.base.TestConnections.testTruncate# +#fn <- '__tmp_98723669834556'; zz <- file(fn, 'w'); writeLines(c('Hello', 'wonderful', 'World'), zz); close(zz); zz <- file(fn, 'r'); truncate(zz); unlink(fn) +Error in truncate.connection(zz) : + can only truncate connections open for writing + +##com.oracle.truffle.r.test.library.base.TestConnections.testTruncate# +#truncate(fifo('__fifo_872636743', 'w+', blocking=T)); unlink('__fifo_872636743') +Error in truncate.connection(fifo("__fifo_872636743", "w+", blocking = T)) : + truncation not enabled for this connection + +##com.oracle.truffle.r.test.library.base.TestConnections.testTruncate# +#truncate(fifo('__fifo_982346798', 'r', blocking=T)); unlink('__fifo_982346798') +Error in fifo("__fifo_982346798", "r", blocking = T) : + cannot open the connection +In addition: Warning message: +In fifo("__fifo_982346798", "r", blocking = T) : + cannot open fifo '__fifo_982346798' + +##com.oracle.truffle.r.test.library.base.TestConnections.testTruncate# +#truncate(pipe('ls')) +Error in truncate.connection(pipe("ls")) : + can only truncate an open connection + +##com.oracle.truffle.r.test.library.base.TestConnections.testTruncate# +#zz <- file(''); writeLines(c('Hello', 'wonderful', 'World'), zz); seek(zz, 0); truncate(zz); flush(zz); readLines(zz) +[1] 22 +NULL +character(0) + +##com.oracle.truffle.r.test.library.base.TestConnections.testTruncate# +#zz <- rawConnection(raw(0), 'r+'); writeLines(c('hello', 'world'), zz); rawConnectionValue(zz); seek(zz, 5); truncate(zz); rawConnectionValue(zz); close(zz) + [1] 68 65 6c 6c 6f 0a 77 6f 72 6c 64 0a +[1] 12 +NULL +[1] 68 65 6c 6c 6f + ##com.oracle.truffle.r.test.library.base.TestConnections.testWriteTextConnection# #c <- textConnection('out', 'w'); cat('testtext', file=c); isIncomplete(c); cat('testtext2\n', file=c); isIncomplete(c); close(c); out [1] TRUE diff --git a/com.oracle.truffle.r.test/src/com/oracle/truffle/r/test/library/base/TestConnections.java b/com.oracle.truffle.r.test/src/com/oracle/truffle/r/test/library/base/TestConnections.java index 4bae7a32254dccdc3c6751ec7f1d83d58cf039ad..2f34c08b2f0cb687a80b721cbfc626f499763f11 100644 --- a/com.oracle.truffle.r.test/src/com/oracle/truffle/r/test/library/base/TestConnections.java +++ b/com.oracle.truffle.r.test/src/com/oracle/truffle/r/test/library/base/TestConnections.java @@ -93,10 +93,15 @@ public class TestConnections extends TestRBase { assertEval(TestBase.template("{ readChar(file(\"%0\"), 3) }", testDir.subDir("wc1"))); } - @Test + @Test(timeout = 1000) public void testFileWriteReadBin() { assertEval(TestBase.template("{ writeBin(\"abc\", file(\"%0\", open=\"wb\")) }", testDir.subDir("wb1"))); assertEval(TestBase.template("{ readBin(file(\"%0\", \"rb\"), 3) }", testDir.subDir("wb1"))); + + assertEval(TestBase.template("{ zz <- file(\"%0\", open=\"wb\"); writeChar(\"abc\", zz); close(zz); readBin(file(\"%0\", \"rb\"), character(), 4) }", testDir.subDir("wb2"))); + + // incomplete line at the end of file + assertEval(TestBase.template("{ cat('abc', file = '%0'); readBin(file('%0', 'rb'), character(), 2) }", testDir.subDir("wb3"))); } @Test @@ -237,7 +242,17 @@ public class TestConnections extends TestRBase { @Test public void testFifoOpenInexisting() { assertEval("capabilities(\"fifo\")"); - assertEval(Output.IgnoreErrorContext, Output.IgnoreWarningContext, "{ fn <- '___fifo_2367253765'; zz <- fifo(fn, 'r', blocking = TRUE); close(zz); unlink(fn) }"); + assertEval("{ fn <- '___fifo_2367253765'; zz <- fifo(fn, 'r', blocking = TRUE); close(zz); unlink(fn) }"); + } + + public void testTruncate() { + assertEval("truncate(pipe('ls'))"); + assertEval("zz <- file(''); writeLines(c('Hello', 'wonderful', 'World'), zz); seek(zz, 0); truncate(zz); flush(zz); readLines(zz)"); + assertEval("fn <- '__tmp_77253842367367'; zz <- file(fn, 'w'); writeLines(c('Hello', 'wonderful', 'World'), zz); seek(zz, 0); truncate(zz); close(zz); readLines(file(fn)); unlink(fn)"); + assertEval("fn <- '__tmp_98723669834556'; zz <- file(fn, 'w'); writeLines(c('Hello', 'wonderful', 'World'), zz); close(zz); zz <- file(fn, 'r'); truncate(zz); unlink(fn)"); + assertEval("zz <- rawConnection(raw(0), 'r+'); writeLines(c('hello', 'world'), zz); rawConnectionValue(zz); seek(zz, 5); truncate(zz); rawConnectionValue(zz); close(zz)"); + assertEval("truncate(fifo('__fifo_872636743', 'w+', blocking=T)); unlink('__fifo_872636743')"); + assertEval("truncate(fifo('__fifo_982346798', 'r', blocking=T)); unlink('__fifo_982346798')"); } private static final String[] LVAL = arr("T", "F");