Skip to content
Snippets Groups Projects
Commit 47cbb1a2 authored by Florian Angerer's avatar Florian Angerer
Browse files

Reduced buffer size and tried to find reason for segfault.

Also added cache invalidation calls when seeking.
parent a449d007
Branches
No related tags found
No related merge requests found
......@@ -57,7 +57,7 @@ import sun.nio.cs.StreamDecoder;
* </p>
*/
abstract class DelegateRConnection implements RConnection, ByteChannel {
private static final int DEFAULT_CACHE_SIZE = 64 * 1024;
private static final int DEFAULT_CACHE_SIZE = 16 * 1024;
protected final BaseRConnection base;
private final ByteBuffer cache;
......@@ -449,14 +449,16 @@ abstract class DelegateRConnection implements RConnection, ByteChannel {
final int bytesRequested = dst.remaining();
int totalBytesRead = 0;
int bytesToRead = 0;
boolean eof;
do {
ensureDataAvailable(dst.remaining());
eof = ensureDataAvailable(dst.remaining());
bytesToRead = Math.min(cache.remaining(), dst.remaining());
cache.get(dst.array(), dst.position(), bytesToRead);
dst.position(dst.position() + bytesToRead);
totalBytesRead += bytesToRead;
} while (totalBytesRead < bytesRequested && bytesToRead > 0);
return totalBytesRead;
return totalBytesRead == 0 && eof ? -1 : totalBytesRead;
// return getChannel().read(dst);
}
@Override
......@@ -473,24 +475,34 @@ abstract class DelegateRConnection implements RConnection, ByteChannel {
* </p>
*/
protected int readInternal() throws IOException {
ensureDataAvailable(1L);
ensureDataAvailable(1);
if (!cache.hasRemaining()) {
return -1;
}
// consider byte to be unsigned
return cache.get() & 0xFF;
// ByteBuffer buf = ByteBuffer.allocate(1);
// int n = getChannel().read(buf);
// if (n <= 0) {
// return -1;
// }
// buf.flip();
// return buf.get() & 0xFF;
}
private void ensureDataAvailable(long i) throws IOException {
private boolean ensureDataAvailable(int i) throws IOException {
if (cache.remaining() < i) {
byte[] rem = new byte[cache.remaining()];
cache.get(rem);
assert !cache.hasRemaining();
cache.clear();
cache.put(rem);
getChannel().read(cache);
int read = getChannel().read(cache);
cache.flip();
return read == -1;
}
return false;
}
protected void invalidateCache() {
......
......@@ -267,6 +267,12 @@ public class FileConnections {
return true;
}
@Override
public long seek(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException {
invalidateCache();
return DelegateRConnection.seek(channel, offset, seekMode, seekRWMode);
}
@Override
public ByteChannel getChannel() {
return channel;
......@@ -425,6 +431,7 @@ public class FileConnections {
default:
throw RError.nyi(RError.SHOW_CALLER, "seek mode");
}
invalidateCache();
return result;
}
......@@ -581,6 +588,7 @@ public class FileConnections {
writeOffset = offset;
break;
}
invalidateCache();
return result;
}
......
......@@ -217,6 +217,7 @@ public class RawConnections {
@Override
public long seek(long offset, SeekMode seekMode, SeekRWMode seekRWMode) throws IOException {
invalidateCache();
return RawRConnection.seek(channel, offset, seekMode, seekRWMode);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment