Skip to content
Snippets Groups Projects
RTalk.java 9.84 KiB
package org.rosuda.REngine.Rserve.protocol;

// JRclient library - client interface to Rserve, see http://www.rosuda.org/Rserve/
// Copyright (C) 2004 Simon Urbanek
// --- for licensing information see LICENSE file in the original JRclient distribution ---

import java.util.*;
import java.io.*;
import java.net.*;

import org.rosuda.REngine.Rserve.RConnection;

/** This class encapsulates the QAP1 protocol used by Rserv.
    it is independent of the underying protocol(s), therefore RTalk
    can be used over any transport layer
    <p>
    The current implementation supports long (0.3+/0102) data format only
    up to 32-bit and only for incoming packets.
    <p>
    @version $Id$
*/
public class RTalk {
    public static final int DT_INT=1;
    public static final int DT_CHAR=2;
    public static final int DT_DOUBLE=3;
    public static final int DT_STRING=4;
    public static final int DT_BYTESTREAM=5;
    public static final int DT_SEXP=10;
    public static final int DT_ARRAY=11;

    /** this is a flag saying that the contents is large (>0xfffff0) and hence uses 56-bit length field */
    public static final int DT_LARGE=64;

    public static final int CMD_login=0x001;
    public static final int CMD_voidEval=0x002;
    public static final int CMD_eval=0x003;
    public static final int CMD_shutdown=0x004;
    public static final int CMD_openFile=0x010;
    public static final int CMD_createFile=0x011;
    public static final int CMD_closeFile=0x012;
    public static final int CMD_readFile=0x013;
    public static final int CMD_writeFile=0x014;
    public static final int CMD_removeFile=0x015;
    public static final int CMD_setSEXP=0x020;
    public static final int CMD_assignSEXP=0x021;
    
    public static final int CMD_setBufferSize=0x081;
    public static final int CMD_setEncoding=0x082;

    public static final int CMD_detachSession=0x030;
    public static final int CMD_detachedVoidEval=0x031;
    public static final int CMD_attachSession=0x032;

    // control commands since 0.6-0
    public static final int CMD_ctrlEval=0x42;
    public static final int CMD_ctrlSource=0x45;
    public static final int CMD_ctrlShutdown=0x44; 
    
    // errors as returned by Rserve
    public static final int ERR_auth_failed=0x41;
    public static final int ERR_conn_broken=0x42;
    public static final int ERR_inv_cmd=0x43;
    public static final int ERR_inv_par=0x44;
    public static final int ERR_Rerror=0x45;
    public static final int ERR_IOerror=0x46;
    public static final int ERR_not_open=0x47;
    public static final int ERR_access_denied=0x48;
    public static final int ERR_unsupported_cmd=0x49;
    public static final int ERR_unknown_cmd=0x4a;
    public static final int ERR_data_overflow=0x4b;
    public static final int ERR_object_too_big=0x4c;
    public static final int ERR_out_of_mem=0x4d;
    public static final int ERR_ctrl_closed=0x4e;
    public static final int ERR_session_busy=0x50;
    public static final int ERR_detach_failed=0x51;
   
    InputStream is;
    OutputStream os;
    
    /** constructor; parameters specify the streams
	@param sis socket input stream
	@param sos socket output stream */

    public RTalk(InputStream sis, OutputStream sos) {
	is=sis; os=sos;
    }

    /** writes bit-wise int to a byte buffer at specified position in Intel-endian form
	@param v value to be written
	@param buf buffer
	@param o offset in the buffer to start at. An int takes always 4 bytes */
    public static void setInt(int v, byte[] buf, int o) {
	buf[o]=(byte)(v&255); o++;
	buf[o]=(byte)((v&0xff00)>>8); o++;
	buf[o]=(byte)((v&0xff0000)>>16); o++;
	buf[o]=(byte)((v&0xff000000)>>24);
    }

    /** writes cmd/resp/type byte + 3/7 bytes len into a byte buffer at specified offset.
	@param ty type/cmd/resp byte
	@param len length
	@param buf buffer
	@param o offset
        @return offset in buf just after the header. Please note that since Rserve 0.3 the header can be either 4 or 8 bytes long, depending on the len parameter.
        */
    public static int setHdr(int ty, int len, byte[] buf, int o) {
        buf[o]=(byte)((ty&255)|((len>0xfffff0)?DT_LARGE:0)); o++;
	buf[o]=(byte)(len&255); o++;
	buf[o]=(byte)((len&0xff00)>>8); o++;
	buf[o]=(byte)((len&0xff0000)>>16); o++;
        if (len>0xfffff0) { // for large data we need to set the next 4 bytes as well
            buf[o]=(byte)((len&0xff000000)>>24); o++;
            buf[o]=0; o++; // since len is int, we get 32-bits only
            buf[o]=0; o++;
            buf[o]=0; o++;
        }
        return o;
    }

    /** creates a new header according to the type and length of the parameter
        @param ty type/cmd/resp byte
        @param len length */        
    public static byte[] newHdr(int ty, int len) {
        byte[] hdr=new byte[(len>0xfffff0)?8:4];
        setHdr(ty,len,hdr,0);
        return hdr;
    }
    
    /** converts bit-wise stored int in Intel-endian form into Java int
	@param buf buffer containg the representation
	@param o offset where to start (4 bytes will be used)
	@return the int value. no bounds checking is done so you need to
	        make sure that the buffer is big enough */
    public static int getInt(byte[] buf, int o) {
	return ((buf[o]&255)|((buf[o+1]&255)<<8)|((buf[o+2]&255)<<16)|((buf[o+3]&255)<<24));
    }

    /** converts bit-wise stored length from a header. "long" format is supported up to 32-bit
	@param buf buffer
	@param o offset of the header (length is at o+1)
	@return length */
    public static int getLen(byte[] buf, int o) {

        return
        ((buf[o]&64)>0)? // "long" format; still - we support 32-bit only
        ((buf[o+1]&255)|((buf[o+2]&255)<<8)|((buf[o+3]&255)<<16)|((buf[o+4]&255)<<24))
        :
        ((buf[o+1]&255)|((buf[o+2]&255)<<8)|((buf[o+3]&255)<<16));
    }

    /** converts bit-wise Intel-endian format into long
	@param buf buffer
	@param o offset (8 bytes will be used)
	@return long value */
    public static long getLong(byte[] buf, int o) {
	long low=((long)getInt(buf,o))&0xffffffffL;
	long hi=((long)getInt(buf,o+4))&0xffffffffL;
	hi<<=32; hi|=low;
	return hi;
    }

    public static void setLong(long l, byte[] buf, int o) {
	setInt((int)(l&0xffffffffL),buf,o);
	setInt((int)(l>>32),buf,o+4);
    }

    /** sends a request with no attached parameters
	@param cmd command
	@return returned packet or <code>null</code> if something went wrong */
    public RPacket request(int cmd) {
        byte[] d = new byte[0];
        return request(cmd,d);
    }

    /** sends a request with attached parameters
        @param cmd command
        @param cont contents - parameters
        @return returned packet or <code>null</code> if something went wrong */
    public RPacket request(int cmd, byte[] cont) {
        return request(cmd,null,cont,0,(cont==null)?0:cont.length);
    }

    /** sends a request with attached prefix and  parameters. Both prefix and cont can be <code>null</code>. Effectively <code>request(a,b,null)</code> and <code>request(a,null,b)</code> are equivalent.
	@param cmd command - a special command of -1 prevents request from sending anything
        @param prefix - this content is sent *before* cont. It is provided to save memory copy operations where a small header precedes a large data chunk (usually prefix conatins the parameter header and cont contains the actual data).
        @param cont contents
        @param offset offset in cont where to start sending (if <0 then 0 is assumed, if >cont.length then no cont is sent)
        @param len number of bytes in cont to send (it is clipped to the length of cont if necessary)
	@return returned packet or <code>null</code> if something went wrong */
    public RPacket request(int cmd, byte[] prefix, byte[] cont, int offset, int len) {
        if (cont!=null) {
            if (offset>=cont.length) { cont=null; len=0; }
            else if (len>cont.length-offset) len=cont.length-offset;
        }
        if (offset<0) offset=0;
        if (len<0) len=0;
        int contlen=(cont==null)?0:len;
        if (prefix!=null && prefix.length>0) contlen+=prefix.length;
	byte[] hdr=new byte[16];
	setInt(cmd,hdr,0);
	setInt(contlen,hdr,4);
	for(int i=8;i<16;i++) hdr[i]=0;
	try {
	    if (cmd!=-1) {
		os.write(hdr);
		if (prefix!=null && prefix.length>0)
		    os.write(prefix);
		if (cont!=null && cont.length>0)
		    os.write(cont,offset,len);
	    }

	    byte[] ih=new byte[16];
	    if (is.read(ih)!=16)
		return null;
	    int rep=getInt(ih,0);
	    int rl =getInt(ih,4);
	    if (rl>0) {
		byte[] ct=new byte[rl];
                int n=0;
                while (n<rl) {
                    int rd=is.read(ct,n,rl-n);
                    n+=rd;
                }
		return new RPacket(rep,ct);
	    }
	    return new RPacket(rep,null);
	} catch(Exception e) {
	    e.printStackTrace();
	    return null;
	}
    }

    /** sends a request with one string parameter attached
	@param cmd command
	@param par parameter - length and DT_STRING will be prepended
	@return returned packet or <code>null</code> if something went wrong */
    public RPacket request(int cmd, String par) {
	try {
            byte[] b=par.getBytes(RConnection.transferCharset);
            int sl=b.length+1;
            if ((sl&3)>0) sl=(sl&0xfffffc)+4; // make sure the length is divisible by 4
	    byte[] rq=new byte[sl+5];
            int i;
	    for(i=0;i<b.length;i++)
                rq[i+4]=b[i];
            while (i<sl) { // pad with 0
                rq[i+4]=0; i++;
            };
	    setHdr(DT_STRING,sl,rq,0);
	    return request(cmd,rq);
	} catch (Exception e) {
	    e.printStackTrace();
	}
	return null;
    }

    /** sends a request with one string parameter attached
        @param cmd command
        @param par parameter of the type DT_INT
        @return returned packet or <code>null</code> if something went wrong */
    public RPacket request(int cmd, int par) {
	try {
	    byte[] rq=new byte[8];
	    setInt(par,rq,4);
	    setHdr(DT_INT,4,rq,0);
	    return request(cmd,rq);
	} catch (Exception e) {
	};
	return null;
    }
}