diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/ConnectionSupport.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/ConnectionSupport.java index 293e28567739091b32a7506abbcfbda3abda5cb7..3ff5c7dff14f56aecb64e57d5344efefc5047db9 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/ConnectionSupport.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/ConnectionSupport.java @@ -409,7 +409,7 @@ public class ConnectionSupport { } @Override - public int getc() throws IOException { + public int read() throws IOException { throw RInternalError.shouldNotReachHere("INVALID CONNECTION"); } @@ -839,9 +839,9 @@ public class ConnectionSupport { } @Override - public int getc() throws IOException { + public int read() throws IOException { checkOpen(); - return theConnection.getc(); + return theConnection.read(); } public long seekInternal(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateRConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateRConnection.java index 37fd174e7a7eaefd2e00a22275014b1815c2d6b0..1d6159c5458cf331fbe481c77474aeafa261fcff 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateRConnection.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateRConnection.java @@ -24,8 +24,11 @@ package com.oracle.truffle.r.runtime.conn; import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.Reader; import java.nio.ByteBuffer; +import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; @@ -54,7 +57,7 @@ import sun.nio.cs.StreamDecoder; */ abstract class DelegateRConnection implements RConnection { protected final BaseRConnection base; - private StreamDecoder decoder; + final ByteBuffer tmp = ByteBuffer.allocate(1); DelegateRConnection(BaseRConnection base) { this.base = Objects.requireNonNull(base); @@ -101,7 +104,7 @@ abstract class DelegateRConnection implements RConnection { base.setIncomplete(false); ArrayList<String> lines = new ArrayList<>(); int totalRead = 0; - char[] buffer = new char[64]; + byte[] buffer = new byte[64]; int pushBack = 0; boolean nullRead = false; while (true) { @@ -110,7 +113,7 @@ abstract class DelegateRConnection implements RConnection { ch = pushBack; pushBack = 0; } else { - ch = getc(); + ch = read(); } boolean lineEnd = false; if (ch < 0) { @@ -119,7 +122,7 @@ abstract class DelegateRConnection implements RConnection { * GnuR says if non-blocking and in text mode, silently push back incomplete * lines, otherwise keep data and output warning. */ - final String incompleteFinalLine = new String(buffer, 0, totalRead); + final String incompleteFinalLine = new String(buffer, 0, totalRead, base.getEncoding()); if (!base.isBlocking() && base.isTextMode()) { base.pushBack(RDataFactory.createStringVector(incompleteFinalLine), false); base.setIncomplete(true); @@ -136,7 +139,7 @@ abstract class DelegateRConnection implements RConnection { lineEnd = true; } else if (ch == '\r') { lineEnd = true; - ch = getc(); + ch = read(); if (ch == '\n') { // swallow the trailing lf } else { @@ -149,7 +152,7 @@ abstract class DelegateRConnection implements RConnection { } } if (lineEnd) { - lines.add(new String(buffer, 0, totalRead)); + lines.add(new String(buffer, 0, totalRead, base.getEncoding())); if (n > 0 && lines.size() == n) { break; } @@ -158,7 +161,7 @@ abstract class DelegateRConnection implements RConnection { } else { if (!nullRead) { buffer = DelegateRConnection.checkBuffer(buffer, totalRead); - buffer[totalRead++] = (char) (ch & 0xFFFF); + buffer[totalRead++] = (byte) (ch & 0xFF); } if (skipNul) { nullRead = false; @@ -170,6 +173,15 @@ abstract class DelegateRConnection implements RConnection { return result; } + @Override + public String readChar(int nchars, boolean useBytes) throws IOException { + if (useBytes) { + return DelegateRConnection.readCharHelper(nchars, getChannel()); + } else { + return DelegateRConnection.readCharHelper(nchars, getDecoder(nchars)); + } + } + /** * Writes a string to a channel. * @@ -328,17 +340,6 @@ abstract class DelegateRConnection implements RConnection { return new String(chars, 0, j); } -// @Override -// public long seek(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException { -// -// -// // need to reset the stream decoder since position changed -// reinitDecoder(); -// } -// -// protected abstract long seekInternal(long offset, SeekMode seekMode, SeekRWMode seekRWMode) -// throws IOException; - /** * Implements standard seeking behavior.<br> * <p> @@ -365,19 +366,6 @@ abstract class DelegateRConnection implements RConnection { return position; } - /** - * Enlarges the buffer if necessary. - */ - private static char[] checkBuffer(char[] buffer, int n) { - if (n > buffer.length - 1) { - char[] newBuffer = new char[buffer.length + buffer.length / 2]; - System.arraycopy(buffer, 0, newBuffer, 0, buffer.length); - return newBuffer; - } else { - return buffer; - } - } - /** * Enlarges the buffer if necessary. */ @@ -423,16 +411,9 @@ abstract class DelegateRConnection implements RConnection { /** * Creates the stream decoder on demand and returns it. */ - protected StreamDecoder getDecoder() throws IOException { - if (decoder == null) { - initDecoder(-1); - } - return decoder; - } - - protected void initDecoder(int bufSize) throws IOException { + protected StreamDecoder getDecoder(int bufSize) throws IOException { CharsetDecoder charsetEncoder = base.getEncoding().newDecoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE); - decoder = StreamDecoder.forDecoder(getChannel(), charsetEncoder, bufSize); + return StreamDecoder.forDecoder(getChannel(), charsetEncoder, bufSize); } @Override @@ -442,4 +423,79 @@ abstract class DelegateRConnection implements RConnection { } throw RError.nyi(RError.SHOW_CALLER, "truncate"); } + + @Override + public void writeBin(ByteBuffer buffer) throws IOException { + getChannel().write(buffer); + } + + @Override + public void writeChar(String s, int pad, String eos, boolean useBytes) throws IOException { + DelegateRConnection.writeCharHelper(getChannel(), s, pad, eos); + } + + @Override + public void writeLines(RAbstractStringVector lines, String sep, boolean useBytes) throws IOException { + boolean incomplete = DelegateRConnection.writeLinesHelper(getChannel(), lines, sep, base.getEncoding()); + base.setIncomplete(incomplete); + } + + @Override + public void writeString(String s, boolean nl) throws IOException { + DelegateRConnection.writeStringHelper(getChannel(), s, nl, base.getEncoding()); + } + + @Override + public int read() throws IOException { + tmp.clear(); + int nread = getChannel().read(tmp); + tmp.rewind(); + return nread > 0 ? tmp.get() & 0xFF : -1; + } + + @Override + public int readBin(ByteBuffer buffer) throws IOException { + return getChannel().read(buffer); + } + + @Override + public byte[] readBinChars() throws IOException { + return DelegateRConnection.readBinCharsHelper(getChannel()); + } + + @Override + public boolean canRead() { + return true; + } + + @Override + public boolean canWrite() { + return true; + } + + @Override + public void flush() throws IOException { + // nothing to do for channels + } + + @Override + public OutputStream getOutputStream() throws IOException { + return Channels.newOutputStream(getChannel()); + } + + @Override + public InputStream getInputStream() throws IOException { + return Channels.newInputStream(getChannel()); + } + + @Override + public void close() throws IOException { + getChannel().close(); + } + + @Override + public void closeAndDestroy() throws IOException { + base.closed = true; + close(); + } } diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadRConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadRConnection.java index ceef89ca962c443a3ea6393c393d9c0e624bfd25..dea881f3c2e6c1029430675809bd74e617cc01b5 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadRConnection.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadRConnection.java @@ -23,10 +23,8 @@ package com.oracle.truffle.r.runtime.conn; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; -import java.nio.channels.Channels; import com.oracle.truffle.r.runtime.RError; import com.oracle.truffle.r.runtime.RInternalError; @@ -35,8 +33,6 @@ import com.oracle.truffle.r.runtime.data.model.RAbstractStringVector; public abstract class DelegateReadRConnection extends DelegateRConnection { - private final ByteBuffer tmp = ByteBuffer.allocate(1); - protected DelegateReadRConnection(BaseRConnection base) { super(base); } @@ -61,42 +57,6 @@ public abstract class DelegateReadRConnection extends DelegateRConnection { throw new IOException(RError.Message.CANNOT_WRITE_CONNECTION.message); } - @Override - public int getc() throws IOException { - if (isTextMode()) { - return getDecoder().read(); - } else { - tmp.clear(); - int nread = getChannel().read(tmp); - tmp.rewind(); - return nread > 0 ? tmp.get() : -1; - } - } - - @Override - public String readChar(int nchars, boolean useBytes) throws IOException { - if (useBytes) { - return DelegateRConnection.readCharHelper(nchars, getChannel()); - } else { - return DelegateRConnection.readCharHelper(nchars, getDecoder()); - } - } - - @Override - public int readBin(ByteBuffer buffer) throws IOException { - return getChannel().read(buffer); - } - - @Override - public byte[] readBinChars() throws IOException { - return DelegateRConnection.readBinCharsHelper(getChannel()); - } - - @Override - public void flush() { - // nothing to do when reading - } - @Override public OutputStream getOutputStream() { throw RInternalError.shouldNotReachHere(); @@ -112,22 +72,6 @@ public abstract class DelegateReadRConnection extends DelegateRConnection { return false; } - @Override - public InputStream getInputStream() throws IOException { - return Channels.newInputStream(getChannel()); - } - - @Override - public void close() throws IOException { - getChannel().close(); - } - - @Override - public void closeAndDestroy() throws IOException { - base.closed = true; - close(); - } - @Override public void truncate() { throw RError.error(RError.SHOW_CALLER, RError.Message.TRUNCATE_ONLY_WRITE_CONNECTION); diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadWriteRConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadWriteRConnection.java index 7154492f4e9abc21b9d8a52ab6574e0bb1c3730a..f8345bc9ab84a343d72b7709b03e4bd70d22ccb4 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadWriteRConnection.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateReadWriteRConnection.java @@ -22,109 +22,12 @@ */ package com.oracle.truffle.r.runtime.conn; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; - import com.oracle.truffle.r.runtime.conn.ConnectionSupport.BaseRConnection; -import com.oracle.truffle.r.runtime.data.model.RAbstractStringVector; abstract class DelegateReadWriteRConnection extends DelegateRConnection { - private final ByteBuffer tmp = ByteBuffer.allocate(1); - protected DelegateReadWriteRConnection(BaseRConnection base) { super(base); } - @Override - public boolean canRead() { - return true; - } - - @Override - public boolean canWrite() { - return true; - } - - @Override - public int getc() throws IOException { - if (isTextMode()) { - return getDecoder().read(); - } else { - tmp.clear(); - int nread = getChannel().read(tmp); - tmp.rewind(); - return nread > 0 ? tmp.get() : -1; - } - } - - @Override - public String readChar(int nchars, boolean useBytes) throws IOException { - if (useBytes) { - return DelegateRConnection.readCharHelper(nchars, getChannel()); - } else { - return DelegateRConnection.readCharHelper(nchars, getDecoder()); - } - } - - @Override - public int readBin(ByteBuffer buffer) throws IOException { - return getChannel().read(buffer); - } - - @Override - public byte[] readBinChars() throws IOException { - return DelegateRConnection.readBinCharsHelper(getChannel()); - } - - @Override - public void flush() { - // nothing to do for channels - } - - @Override - public OutputStream getOutputStream() throws IOException { - return Channels.newOutputStream(getChannel()); - } - - @Override - public InputStream getInputStream() throws IOException { - return Channels.newInputStream(getChannel()); - } - - @Override - public void close() throws IOException { - getChannel().close(); - } - - @Override - public void closeAndDestroy() throws IOException { - base.closed = true; - close(); - } - - @Override - public void writeBin(ByteBuffer buffer) throws IOException { - getChannel().write(buffer); - } - - @Override - public void writeChar(String s, int pad, String eos, boolean useBytes) throws IOException { - DelegateRConnection.writeCharHelper(getChannel(), s, pad, eos); - } - - @Override - public void writeLines(RAbstractStringVector lines, String sep, boolean useBytes) throws IOException { - boolean incomplete = DelegateRConnection.writeLinesHelper(getChannel(), lines, sep, base.getEncoding()); - base.setIncomplete(incomplete); - } - - @Override - public void writeString(String s, boolean nl) throws IOException { - DelegateRConnection.writeStringHelper(getChannel(), s, nl, base.getEncoding()); - } - } diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateWriteRConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateWriteRConnection.java index 4d4520d88cfea5b2ce808e579e45c8f4991fe4b5..7a7b12112143d10bff6b330a750ae78cbadb7c6c 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateWriteRConnection.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/DelegateWriteRConnection.java @@ -24,14 +24,11 @@ package com.oracle.truffle.r.runtime.conn; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.nio.ByteBuffer; -import java.nio.channels.Channels; import com.oracle.truffle.r.runtime.RError; import com.oracle.truffle.r.runtime.RInternalError; import com.oracle.truffle.r.runtime.conn.ConnectionSupport.BaseRConnection; -import com.oracle.truffle.r.runtime.data.model.RAbstractStringVector; abstract class DelegateWriteRConnection extends DelegateRConnection { @@ -60,7 +57,7 @@ abstract class DelegateWriteRConnection extends DelegateRConnection { } @Override - public int getc() throws IOException { + public int read() throws IOException { throw new IOException(RError.Message.CANNOT_READ_CONNECTION.message); } @@ -78,48 +75,4 @@ abstract class DelegateWriteRConnection extends DelegateRConnection { public boolean canWrite() { return true; } - - @Override - public void closeAndDestroy() throws IOException { - base.closed = true; - close(); - } - - @Override - public void flush() throws IOException { - // channels don't need any flushing - } - - @Override - public void close() throws IOException { - flush(); - getChannel().close(); - } - - @Override - public OutputStream getOutputStream() throws IOException { - return Channels.newOutputStream(getChannel()); - } - - @Override - public void writeBin(ByteBuffer buffer) throws IOException { - getChannel().write(buffer); - } - - @Override - public void writeChar(String s, int pad, String eos, boolean useBytes) throws IOException { - DelegateRConnection.writeCharHelper(getChannel(), s, pad, eos); - } - - @Override - public void writeLines(RAbstractStringVector lines, String sep, boolean useBytes) throws IOException { - boolean incomplete = DelegateRConnection.writeLinesHelper(getChannel(), lines, sep, base.getEncoding()); - base.setIncomplete(incomplete); - } - - @Override - public void writeString(String s, boolean nl) throws IOException { - DelegateRConnection.writeStringHelper(getChannel(), s, nl, base.getEncoding()); - } - } diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/FileConnections.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/FileConnections.java index dd879051c36106f20103ef951132a3c44363784f..6c6e7b35febb77d4bf061fe52a3d5b28d43de7a5 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/FileConnections.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/FileConnections.java @@ -31,11 +31,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.RandomAccessFile; import java.nio.ByteBuffer; -import java.nio.CharBuffer; import java.nio.channels.ByteChannel; import java.nio.channels.FileChannel; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.CharsetEncoder; import java.nio.file.OpenOption; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; @@ -347,7 +344,6 @@ public class FileConnections { private static class FileReadWriteTextConnection extends DelegateReadWriteRConnection { private final FileChannel channel; - private final CharsetEncoder encoder; private long readOffset; private long writeOffset; private SeekRWMode lastMode = SeekRWMode.READ; @@ -360,26 +356,18 @@ public class FileConnections { opts.add(StandardOpenOption.CREATE); opts.add(StandardOpenOption.TRUNCATE_EXISTING); channel = FileChannel.open(Paths.get(base.path), opts.toArray(new OpenOption[opts.size()])); - encoder = base.getEncoding().newEncoder(); } @Override - public int getc() throws IOException { + public int read() throws IOException { setReadPosition(); - int value = super.getc(); + final int value = super.read(); if (value != -1) { - readOffset += getNumBytes(value); + readOffset++; } return value; } - private long getNumBytes(int value) throws CharacterCodingException { - // TODO We need to get the information about how much bytes have been consumed from the - // stream decoder. This is really bad! - ByteBuffer encode = encoder.encode(CharBuffer.wrap(new char[]{(char) value})); - return encode.position(); - } - @Override public boolean isSeekable() { return true; @@ -392,6 +380,7 @@ public class FileConnections { switch (seekMode) { case ENQUIRE: set = false; + break; case START: break; default: @@ -456,9 +445,6 @@ public class FileConnections { if (lastMode != SeekRWMode.READ) { channel.position(readOffset); lastMode = SeekRWMode.READ; - - // re-initialize stream decoder if position changed - initDecoder(1); } } @@ -504,9 +490,9 @@ public class FileConnections { } @Override - public int getc() throws IOException { + public int read() throws IOException { setReadPosition(); - int value = super.getc(); + final int value = raf.read(); if (value != -1) { readOffset++; } @@ -575,9 +561,6 @@ public class FileConnections { if (lastMode != SeekRWMode.READ) { raf.seek(readOffset); lastMode = SeekRWMode.READ; - - // re-initialize stream decoder if position changed - initDecoder(1); } } diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/RConnection.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/RConnection.java index e6a345e5a652f4c3d04e84127db1cdc9d9565024..4ea9d1edf89e9eae023fe6c6b36662ba4efc3f11 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/RConnection.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/RConnection.java @@ -139,9 +139,9 @@ public interface RConnection extends AutoCloseable { long seek(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException; /** - * Internal support for reading one character at a time. + * Internal support for reading one byte at a time. */ - int getc() throws IOException; + int read() throws IOException; /** * Write the {@code lines} to the connection, with {@code sep} appended after each "line". N.B. @@ -167,7 +167,7 @@ public interface RConnection extends AutoCloseable { * @param s string to output * @param pad number of (zero) pad bytes * @param eos string to append to s - * @param useBytes TODO + * @param useBytes */ void writeChar(String s, int pad, String eos, boolean useBytes) throws IOException; diff --git a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/StdConnections.java b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/StdConnections.java index 6150dc902b2cf11456b38457c64e489a62bf8f12..648995d525cd95d836898925423ad68392c09b06 100644 --- a/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/StdConnections.java +++ b/com.oracle.truffle.r.runtime/src/com/oracle/truffle/r/runtime/conn/StdConnections.java @@ -215,7 +215,7 @@ public class StdConnections { } @Override - public int getc() throws IOException { + public int read() throws IOException { throw RInternalError.unimplemented("stdin getc"); } } @@ -240,7 +240,7 @@ public class StdConnections { } @Override - public int getc() throws IOException { + public int read() throws IOException { throw new IOException(RError.Message.CANNOT_READ_CONNECTION.message); } }