// BEGIN LICENSE BLOCK
// Version: CMPL 1.1
//
// The contents of this file are subject to the Cisco-style Mozilla Public
// License Version 1.1 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License
// at www.eclipse-clp.org/license.
//
// Software distributed under the License is distributed on an "AS IS"
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
// the License for the specific language governing rights and limitations
// under the License.
//
// The Original Code is The ECLiPSe Constraint Logic Programming System.
// The Initial Developer of the Original Code is Cisco Systems, Inc.
// Portions created by the Initial Developer are
// Copyright (C) 2001 - 2006 Cisco Systems, Inc. All Rights Reserved.
//
// Contributor(s): Josh Singer, Parc Technologies
//
// END LICENSE BLOCK
//Title: Java/ECLiPSe interface
//Version: $Id: RemoteEclipse.java,v 1.1 2006/09/23 01:54:12 snovello Exp $
//Author: Josh Singer
//Company: Parc Technologies
//Description: Connection to a remote ECLiPSe.
package com.parctechnologies.eclipse;
import java.io.*;
import java.net.*;
import java.util.*;
/**
* Remote connection to an existing ECLiPSe process. The connection may be made
* over a TCP/IP network to an ECLiPSe which has been primed using the
* remote_connect/3
or the remote_connect_setup/3
* builtin predicates. As well as the functionality provided by the
* EclipseConnection interface, RemoteEclipse also allows for
* execution control to be transferred explicity over to ECLiPSe with the
* resume()
method.
*
*
The connection is terminated from the Java side using either the
* disconnect()
method (when Java has execution control) or
* unilateralDisconnect()
(when ECLiPSe has execution control).
* If the builtin predicate remote_disconnect/1
is executed on
* the ECLiPSe side, the effect on the Java side are similar to the
* effects of disconnect()
.
*
*/
public class RemoteEclipse extends EclipseConnectionImpl implements EclipseConnection
{
/**
* Implementation overview
*
* I assume familiarity with the Eclips remote interface protocol.
*
* The main complication to the implementation of this class is the fact that
* ECLiPSe sockets have a very small fixed buffer size. Consequences:
* (1) When a client writes to an Eclipse socket and the socket's buffer
* reaches capacity, the client's write/flush command blocks indefinitely
* unless Eclipse is reading from the socket's stream. Because of this block,
* any read call which the user may set up using rpc is never reached.
*
* (2) Conversely, when Eclipse flushes an amount of data through the socket
* stream which is larger than capacity, its own write command blocks
* indefinitely unless there is a corresponding read command waiting on the
* Java side. However, the user cannot set up this read after Eclipse flushes,
* because control does not return to Java until after the write command
* completes.
*
* The way we handle (1) is to enclose the socket OutputStream in a special
* NonBlockingOutputStream, whose write and flush methods are non-blocking
* A thread is hidden within each NonBlockingOutputStream object. The thread
* is activated whenever the NonBlockingOutputStream is flushed and handles
* the write and flush calls to the underlying stream (the socket in this
* case) concurrently. The hidden thread may block, but the original thread
* is unblocked. It can therefore go ahead and set up a read call on the
* Eclipse side, which will remove the blockage, and read the data written by
* the hidden stream.
*
* (2) is handled using a buffer on the Java side. Before attempting to write
* the data, Eclipse sends an ec_flushio signal, indicating the amount about
* to be written. This amount of data is immediately read into a buffer.
* From there it can be read by the user later. See the
* FromEclipseQueueBuffer class for more details.
*
*/
// The socket for sending/receiving control signals
private Socket control;
// The socket for sending/receiving rpc goals
private Socket rpc;
// The name of the Eclipse remote connection which this object is connected
// to.
private String connectionName;
// Stream for receiving control signals in EXDR format
private EXDRInputStream controlEXDRInput;
// Stream for receiving rpc goals in EXDR format
private EXDRInputStream rpcEXDRInput;
// Stream for sending control signals in EXDR format
private EXDROutputStream controlEXDROutput;
// Stream for sending rpc goals in EXDR format
private EXDROutputStream rpcEXDROutput;
// common atoms
private static final Atom resumeAtom = new Atom("resume");
private static final Atom rpcAtom = new Atom("rpc");
private static final Atom yieldAtom = new Atom("yield");
private static final Atom disconnectAtom = new Atom("disconnect");
private static final Atom disconnectYieldAtom = new Atom("disconnect_yield");
private static final Atom disconnectResumeAtom = new Atom("disconnect_resume");
private static final Atom syncAtom = new Atom("sync");
private static final Atom asyncAtom = new Atom("async");
private static final Atom fromecAtom = new Atom("fromec");
private static final Atom toecAtom = new Atom("toec");
private static final Atom bidirectAtom = new Atom("bidirect");
private static final Atom emptyAtom = new Atom("");
private static final Atom failAtom = new Atom("fail");
private static final Atom successAtom = new Atom("success");
private Map queueInfo = new HashMap();
// The address of the machine which Eclipse is running on, as read during the
// protocol. This should be used in subsequent client socket connections.
private InetAddress hostAddress;
private static final int PROTOCOL_VERSION_NUMBER = 1;
public static final int DEFAULT_TIMEOUT_MILLIS = 30000;
/**
* Make a connection to an existing ECLiPSe process. The ECLiPSe process must
* be on a server machine which is reachable on a TCP/IP network. ECLiPSe must be
* primed using remote_connect_setup/3
. The builtin predicate
* remote_connect_accept/6
should then be used to complete the
* connection. The connection details
* (IP address, port number, password) are specified as parameters and must match those
* specified/returned as arguments in the execution of
* remote_connect_setup/3
and remote_connect_accept/6
.
*
* @throws IOException if the connection could not be made, or times out
* within DEFAULT_TIMEOUT_MILLIS
milliseconds.
*/
public RemoteEclipse(InetAddress remoteEclipseHost, int remoteEclipsePort,
String passwd)
throws IOException
{
this(remoteEclipseHost, remoteEclipsePort, passwd, DEFAULT_TIMEOUT_MILLIS);
}
/**
* Make a connection to an existing ECLiPSe process. The ECLiPSe process must be
*
* on a server machine which is reachable on a TCP/IP network. ECLiPSe must be
* primed using remote_connect/3
. The connection details
* (IP address, port number) are specified as parameters and must match those
* specified/returned as arguments in the execution of
* remote_connect/3
. If remote_connect_setup/3
. was
* used to prime ECLiPSe for the remote connection, this constructor will fail
* as it does not use a password.
*
* @throws IOException if the connection could not be made, or times out
* within DEFAULT_TIMEOUT_MILLIS
milliseconds.
*/
public RemoteEclipse(InetAddress remoteEclipseHost, int remoteEclipsePort)
throws IOException
{
this(remoteEclipseHost, remoteEclipsePort, "", DEFAULT_TIMEOUT_MILLIS);
}
/**
* Make a connection to an existing ECLiPSe process. The ECLiPSe process must
* be on a server machine which is reachable on a TCP/IP network. ECLiPSe must be
* primed using remote_connect/3
. The connection details
* (IP address, port number) are specified as parameters and must match those
* specified/returned as arguments in the execution of
* remote_connect/3
. If remote_connect_setup/3
. was
* used to prime ECLiPSe for the remote connection, this constructor will fail
* as it does not use a password.
*
* @param timeoutMillis number of milliseconds to wait for the initial
* connection to be established before throwing an exception. Set
* timeoutMillis
to 0 to wait indefinitely for the connection.
*
* @throws IOException if the connection could not be made, or times out
* within timeoutMillis
milliseconds.
*/
public RemoteEclipse(InetAddress remoteEclipseHost, int remoteEclipsePort,
int timeoutMillis)
throws IOException
{
this(remoteEclipseHost, remoteEclipsePort, "", timeoutMillis);
}
private void setUpControl(InetAddress remoteEclipseHost,
int remoteEclipsePort)
throws IOException
{
control = new Socket(remoteEclipseHost, remoteEclipsePort);
// initialise the EXDR readers and writers on the control socket
controlEXDRInput = new EXDRInputStream(control.getInputStream());
controlEXDROutput = new EXDROutputStream(control.getOutputStream());
}
private void setUpRPC(InetAddress remoteEclipseHost, int remoteEclipsePort)
throws IOException
{
rpc = new Socket(remoteEclipseHost, remoteEclipsePort);
// Initialise readers and writers on these sockets
rpcEXDRInput = new EXDRInputStream(rpc.getInputStream());
rpcEXDROutput = new EXDROutputStream(rpc.getOutputStream());
}
/**
* Make a connection to an existing ECLiPSe process. The ECLiPSe process must
* be on a server machine which is reachable on a TCP/IP network. ECLiPSe must be
* primed using remote_connect_setup/3
. The builtin predicate
* remote_connect_accept/6
should then be used to complete the
* connection. The connection details
* (IP address, port number, password) are specified as parameters and must match those
* specified/returned as arguments in the execution of
* remote_connect_setup/3
and remote_connect_accept/6
.
*
* @param timeoutMillis number of milliseconds to wait for the initial
* connection to be established before throwing an exception. Set
* timeoutMillis
to 0 to wait indefinitely for the connection.
*
* @throws IOException if the connection could not be made, or times out
* within timeoutMillis
milliseconds.
*/
public RemoteEclipse(InetAddress remoteEclipseHost, int remoteEclipsePort,
String passwd, int timeoutMillis)
throws IOException
{
hostAddress = remoteEclipseHost;
// System.out.println("starting remote protocol");
setUpControl(remoteEclipseHost, remoteEclipsePort);
CompoundTerm protocolTerm =
new CompoundTermImpl("remote_protocol",
new Integer(PROTOCOL_VERSION_NUMBER));
//System.out.println("protocol term is "+protocolTerm);
writeControl(protocolTerm);
Object presponse = readControl();
//System.out.println("presponse is "+presponse);
if(!presponse.equals("yes"))
{
//System.out.println("presponse not equal to \"yes\", closing control");
control.close();
throw(new IOException("Remote protocol error: protocol version unsupported."));
}
//System.out.println("presponse equal to \"yes\", continuing protocol");
writeControl(passwd);
// read the connectionName on the control socket
connectionName = ((Atom) readControlTimeout(timeoutMillis)).functor();
// write the language name on the control socket
writeControl("java");
// System.out.println("Read connection name: "+connectionName);
setUpRPC(remoteEclipseHost, remoteEclipsePort);
if(!connectionName.equals(((Atom) readRPCTimeout(timeoutMillis)).functor()))
{
throw(new IOException("Remote protocol error."));
}
// set up the peer name locally
setPeerName(new Atom(connectionName));
// System.out.println("remote protocol complete");
}
/**
* Terminate the connection with ECLiPSe unilaterally. This method should be
* invoked in unforseen circumstances when the connection should be terminated
* while ECLiPSe has execution control. After
* unilateralDisconnect
has
* been invoked, public methods invoked on this RemoteEclipse will
* throw EclipseTerminatedExceptions.
*
* @throws EclipseTerminatedException if the connection has already been
* terminated.
*/
public void unilateralDisconnect() throws EclipseTerminatedException
{
shouldDisconnect = false; // Avoid recursive calls to unilateralDisconnect
testTerminated();
try
{
// don't use writeControl here as that might cause a recursive invocation
// of unilateralDisconnect.
controlEXDROutput.write(disconnectResumeAtom);
controlEXDROutput.flush();
}
catch(Exception e) {}
try
{
terminateJavaSide();
}
catch(IOException ioe) {}
}
/**
* Terminate the remote connection to ECLiPSe. This should be invoked while
* Java has control. If ECLiPSe has control then use
* unlateralDisconnect
instead. After disconnect
has
* been invoked, public methods invoked on this RemoteEclipse will
* throw EclipseTerminatedExceptions.
*
* @throws EclipseTerminatedException if the connection has already been
* terminated.
* @throws IOException if there was a problem communicating with ECLiPSe
* during disconnection.
*/
public synchronized void disconnect() throws IOException
{
testTerminated();
// write the disconnect atom on the control connection
writeControl(disconnectAtom);
// read "disconnect_yield" on the control connection
Object result = null;
try
{
result = readControl();
}
catch(IOException ioe)
{
// don't mind if the connection has already been lost
}
if(result != null && !result.equals(disconnectYieldAtom))
{
throw new IOException("Remote protocol error.");
}
terminateJavaSide();
}
private void terminateJavaSide() throws IOException
{
// set this object to terminated
terminated = true;
// close all user queues, but not the eclipse sides
closeAllQueues(false);
// try to close control and rpc sockets, but if this fails, don't worry.
try
{
control.close();
}
catch(Exception e){}
try
{
rpc.close();
}
catch(Exception e){}
}
/**
* Explicitly transfer execution control to ECLiPSe. ECLiPSe will resume
* execution immediately after the last goal which transferred control to
* Java (normally remote_connect/3
or
* remote_connect_setup/3
). This method should not be
* invoked while control has been transferred to Java using a
* QueueListener. An invocation of resume()
should
* normally be paired with an execution of the builtin predicate
* remote_yield/1
, which can return execution control to Java.
*
* @throws EclipseTerminatedException if the connection to ECLiPSe has been
* terminated.
* @throws IOException if there was a problem communicating with ECLiPSe.
*/
public synchronized void resume() throws IOException
{
testTerminated();
waitForEclipse(true);
}
// called to perform setup code additional to what is done in
// EclipseConnectionImpl
void setupFromEclipseQueue(String name) throws EclipseException, IOException
{
// port to be used on eclipse side for socket
int port;
// Atoms for name and connectionName (used in rpc calls)
Atom nameAtom = new Atom(name);
// compound term returned by ECLiPSe during interaction
CompoundTerm result1 = null;
// write queue_create(QueueName, sync, fromec, '') on the control connection
writeControl(new CompoundTermImpl("queue_create", nameAtom, syncAtom,
fromecAtom, emptyAtom));
// read the next term from the control connection
try
{
result1 = (CompoundTerm) readControl();
}
catch(ClassCastException cce)
{
throw new IOException("Remote interface protocol error.");
}
if(result1.equals(yieldAtom))
{
throw new EclipseException("Could not create ECLiPSe side of queue.");
}
// System.out.println("result1 = "+result1);
// check that the response obeys the protocol
if(!result1.functor().equals("socket_client") ||
result1.arity() != 4 ||
!(result1.arg(1) instanceof Integer) ||
!(result1.arg(2) instanceof Atom) ||
!(result1.arg(3) instanceof Atom) ||
!(result1.arg(4) instanceof Atom) ||
!((Atom) result1.arg(2)).equals(nameAtom) ||
!((Atom) result1.arg(3)).functor().equals("sync") ||
!((Atom) result1.arg(4)).functor().equals("fromec"))
{
throw new IOException("Remote interface protocol error.");
}
// extract port number from the response
port = ((Integer) result1.arg(1)).intValue();
setupRemoteFromecQueue(nameAtom, port);
// send the resume message + wait for yield.
resume();
}
private void setupRemoteFromecQueue(Atom nameAtom, int port)
throws IOException
{
// result term received during interaction
CompoundTerm result2 = null;
// Socket to be used for the new queue
Socket newSocket;
// try to connect the new socket to the address specified during the remote
// protocol initialisation and the port specified above.
try
{
newSocket = new Socket(hostAddress, port);
}
catch(IOException e) // thrown if (for example), something else
// has stolen the connection.
{
// inform ECLiPSe that socket creation failed
writeControl(new CompoundTermImpl("socket_connect", nameAtom, failAtom));
// throw the exception
throw e;
}
// otherwise inform ECLiPSe that socket creation succeeded
writeControl(new CompoundTermImpl("socket_connect", nameAtom, successAtom));
// read the next term from the control connection
try
{
result2 = (CompoundTerm) readControl();
}
catch(ClassCastException cce)
{
throw new IOException("Remote interface protocol error.");
}
// check that this obeys the protocol
if(!result2.functor().equals("socket_accept") ||
result2.arity() != 2 ||
!(result2.arg(1) instanceof Atom) ||
!(result2.arg(2) instanceof Integer) ||
!((Atom) result2.arg(1)).equals(nameAtom))
{
throw new IOException("Remote interface protocol error.");
}
// extract the stream number id of the named queue
Integer Id = (Integer) (result2.arg(2));
int id = Id.intValue();
setupFromecInfo(id, newSocket);
}
// called to perform setup code additional to what is done in
// EclipseConnectionImpl
void setupToEclipseQueue(String name) throws EclipseException, IOException
{
// port to be used on eclipse side for socket
int port;
// Atoms for name and connectionName (used in rpc calls)
Atom nameAtom = new Atom(name);
// compound term returned by ECLiPSe during interaction
CompoundTerm result1 = null;
// write queue_create(QueueName, sync, fromec, '') on the control connection
writeControl(new CompoundTermImpl("queue_create", nameAtom, syncAtom,
toecAtom, emptyAtom));
// read the next term from the control connection
try
{
result1 = (CompoundTerm) readControl();
}
catch(ClassCastException cce)
{
throw new IOException("Remote interface protocol error.");
}
if(result1.equals(yieldAtom))
{
throw new EclipseException("Could not create ECLiPSe side of queue.");
}
// System.out.println("result1 = "+result1);
// check that the response obeys the protocol
if(!result1.functor().equals("socket_client") ||
result1.arity() != 4 ||
!(result1.arg(1) instanceof Integer) ||
!(result1.arg(2) instanceof Atom) ||
!(result1.arg(3) instanceof Atom) ||
!(result1.arg(4) instanceof Atom) ||
!((Atom) result1.arg(2)).equals(nameAtom) ||
!((Atom) result1.arg(3)).functor().equals("sync") ||
!((Atom) result1.arg(4)).functor().equals("toec"))
{
throw new IOException("Remote interface protocol error.");
}
// extract port number from the response
port = ((Integer) result1.arg(1)).intValue();
setupRemoteToecQueue(nameAtom, port);
// send the resume message + wait for yield
resume();
}
private void setupRemoteToecQueue(Atom nameAtom, int port)
throws IOException
{
// Socket to be used for the new queue
Socket newSocket;
// compound term returned by ECLiPSe during interaction
CompoundTerm result2 = null;
// try to connect the new socket to the address specified during the remote
// protocol initialisation and the port specified above.
try
{
newSocket = new Socket(hostAddress, port);
}
catch(IOException e) // thrown if (for example), something else
// has stolen the connection.
{
// inform ECLiPSe that socket creation failed
writeControl(new CompoundTermImpl("socket_connect", nameAtom, failAtom));
// throw the exception
throw e;
}
// otherwise inform ECLiPSe that socket creation succeeded
writeControl(new CompoundTermImpl("socket_connect", nameAtom, successAtom));
// read the next term from the control connection
try
{
result2 = (CompoundTerm) readControl();
}
catch(ClassCastException cce)
{
throw new IOException("Remote interface protocol error.");
}
// check that this obeys the protocol
if(!result2.functor().equals("socket_accept") ||
result2.arity() != 2 ||
!(result2.arg(1) instanceof Atom) ||
!(result2.arg(2) instanceof Integer) ||
!((Atom) result2.arg(1)).equals(nameAtom))
{
throw new IOException("Remote interface protocol error.");
}
// extract the stream number id of the named queue
Integer Id = (Integer) (result2.arg(2));
int id = Id.intValue();
this.setupToecInfo(id, newSocket);
}
//-------------------------------------------------------------------
// called to perform setup code additional to what is done in
// EclipseConnectionImpl
void setupAsyncEclipseQueue(String name) throws EclipseException, IOException
{
// port to be used on eclipse side for socket
int port;
// Atoms for name and connectionName (used in rpc calls)
Atom nameAtom = new Atom(name);
// compound term returned by ECLiPSe during interaction
CompoundTerm result1 = null;
// write queue_create(QueueName, sync, fromec, '') on the control connection
writeControl(new CompoundTermImpl("queue_create", nameAtom, asyncAtom,
bidirectAtom, emptyAtom));
// read the next term from the control connection
try
{
result1 = (CompoundTerm) readControl();
}
catch(ClassCastException cce)
{
throw new IOException("Remote interface protocol error.");
}
if(result1.equals(yieldAtom))
{
throw new EclipseException("Could not create ECLiPSe side of queue.");
}
// System.out.println("result1 = "+result1);
// check that the response obeys the protocol
if(!result1.functor().equals("socket_client") ||
result1.arity() != 4 ||
!(result1.arg(1) instanceof Integer) ||
!(result1.arg(2) instanceof Atom) ||
!(result1.arg(3) instanceof Atom) ||
!(result1.arg(4) instanceof Atom) ||
!((Atom) result1.arg(2)).equals(nameAtom) ||
!((Atom) result1.arg(3)).functor().equals("async") ||
!((Atom) result1.arg(4)).functor().equals("bidirect"))
{
throw new IOException("Remote interface protocol error.");
}
// extract port number from the response
port = ((Integer) result1.arg(1)).intValue();
setupRemoteAsyncecQueue(nameAtom, port);
// send the resume message + wait for yield
resume();
}
private void setupRemoteAsyncecQueue(Atom nameAtom, int port)
throws IOException
{
// Socket to be used for the new queue
Socket newSocket;
// compound term returned by ECLiPSe during interaction
CompoundTerm result2 = null;
// try to connect the new socket to the address specified during the remote
// protocol initialisation and the port specified above.
try
{
newSocket = new Socket(hostAddress, port);
}
catch(IOException e) // thrown if (for example), something else
// has stolen the connection.
{
// inform ECLiPSe that socket creation failed
writeControl(new CompoundTermImpl("socket_connect", nameAtom, failAtom));
// throw the exception
throw e;
}
// otherwise inform ECLiPSe that socket creation succeeded
writeControl(new CompoundTermImpl("socket_connect", nameAtom, successAtom));
// read the next term from the control connection
try
{
result2 = (CompoundTerm) readControl();
}
catch(ClassCastException cce)
{
throw new IOException("Remote interface protocol error.");
}
// check that this obeys the protocol
if(!result2.functor().equals("socket_accept") ||
result2.arity() != 2 ||
!(result2.arg(1) instanceof Atom) ||
!(result2.arg(2) instanceof Integer) ||
!((Atom) result2.arg(1)).equals(nameAtom))
{
throw new IOException("Remote interface protocol error.");
}
// extract the stream number id of the named queue
Integer Id = (Integer) (result2.arg(2));
int id = Id.intValue();
setupAsyncecInfo(id, newSocket);
}
//-------------------------------------------------------------------
boolean shouldDisconnect = false;
void testTerminated() throws EclipseTerminatedException
{
if(shouldDisconnect)
{
unilateralDisconnect();
}
super.testTerminated();
}
// Called if the flush() method of a ToEclipseQueue is called.
synchronized void flushStream(int streamID) throws IOException
{
// write rem_flushio(Queuenumber, Bytes) on control connection & flush
writeControl(new CompoundTermImpl("rem_flushio",
new Integer(streamID),
new Integer(getBytesBuffered(streamID))));
// get the buffer to flush (in the background) to its destination
getOutputStream(streamID).flush();
// Now these bytes have been flushed, reset the byte counter
setBytesBuffered(streamID, 0);
// hand control to Eclipse and handle any resulting events until yield
waitForEclipse(false);
}
synchronized void writeByteToStream(int streamid, byte b) throws IOException
{
getOutputStream(streamid).write(0xff & b);
setBytesBuffered(streamid, getBytesBuffered(streamid)+1);
}
synchronized int writeToStream(int streamid, byte[] b, int off, int len) throws IOException
{
getOutputStream(streamid).write(b, off, len);
// since the above statement cannot have written less than len bytes unless it
// threw an exception, if execution reaches the current point, we know len
// bytes have been written.
setBytesBuffered(streamid, getBytesBuffered(streamid)+len);
return(len);
}
// Called if the available() method of a FromEclipseQueue is called.
synchronized int availableOnStream(int streamID)
{
return(this.availableInBuffer(streamID));
}
// called by the read method in FromEclipseQueue. Returns -1 if there are no
// bytes in the buffer.
synchronized int readByteFromStream(int streamID) throws IOException
{
return(readByteFromBuffer(streamID));
}
// called by the read methods of FromEclipseQueue. Returns the number of
// bytes read (may be less than len).
synchronized int readFromStream(int streamid, int off, int len, byte[] b) throws IOException
{
return(readBytesFromBuffer(streamid, b, off, len));
}
ControlSignal getNextControlSignal(boolean isFirstIteration,
boolean transferControlWithResume)
throws IOException
{
if(transferControlWithResume || !isFirstIteration)
{
writeControl(resumeAtom);
}
CompoundTerm nextControlTerm = getNextControlTerm();
if(signalsYield(nextControlTerm))
{
return(new YieldSignal());
}
if(signalsMultilateralDisconnect(nextControlTerm))
{
return(new MultilateralDisconnectSignal());
}
if(signalsUnilateralDisconnect(nextControlTerm))
{
return(new UnilateralDisconnectSignal());
}
if(signalsFlushIO(nextControlTerm))
{
return(new FlushIOSignal((Integer) nextControlTerm.arg(1),
(Integer) nextControlTerm.arg(2)));
}
if(signalsWaitIO(nextControlTerm))
{
return(new WaitIOSignal((Integer) nextControlTerm.arg(1)));
}
if(signalsCloseQueue(nextControlTerm))
{
return(new CloseQueueSignal((Integer) nextControlTerm.arg(1)));
}
if(signalsOpenQueue(nextControlTerm))
{
return(new RemoteOpenQueueSignal((Integer) nextControlTerm.arg(1),
(Atom) nextControlTerm.arg(2),
(Atom) nextControlTerm.arg(3),
(Atom) nextControlTerm.arg(4)));
}
// default, signifies unrecognised signal
return(null);
}
/**
* Send an RPC goal to ECLiPSe.
*/
void sendGoal(Object goal) throws IOException
{
writeControl(rpcAtom);
writeRPC(goal);
// System.out.println("Sent goal on rpc connection: "+goal);
}
/**
* Receive an RPC goal from ECLiPSe.
*/
Object receiveGoal() throws IOException
{
return(rpcEXDRInput.readTerm());
}
void closeFromEclipseStreamJavaSide(int streamid) throws IOException
{
super.closeFromEclipseStreamJavaSide(streamid);
closeFromecSocket(streamid);
removeInfo(streamid);
}
void closeToEclipseStreamJavaSide(int streamid) throws IOException
{
super.closeToEclipseStreamJavaSide(streamid);
getOutputStream(streamid).close();
closeToecSocket(streamid);
setBytesBuffered(streamid, 0);
}
void closeFromEclipseStreamEclipseSide(int streamid) throws IOException
{
super.closeFromEclipseStreamEclipseSide(streamid);
// write queue_close(streamid) on the control connection
writeControl(new CompoundTermImpl("queue_close",
new Integer(streamid)));
// wait for a yield signal on the control connection
Object result = readControl();
if(!result.equals(yieldAtom))
{
throw new IOException("Remote protocol error.");
}
}
void closeToEclipseStreamEclipseSide(int streamid) throws IOException
{
super.closeToEclipseStreamEclipseSide(streamid);
// write queue_close(streamid) on the control connection
writeControl(new CompoundTermImpl("queue_close",
new Integer(streamid)));
// wait for a yield signal on the control connection
Object result = readControl();
if(!result.equals(yieldAtom))
{
throw new IOException("Remote protocol error.");
}
}
void closeAsyncEclipseStreamEclipseSide(int streamid) throws IOException
{
super.closeAsyncEclipseStreamEclipseSide(streamid);
// write queue_close(streamid) on the control connection
writeControl(new CompoundTermImpl("queue_close",
new Integer(streamid)));
// wait for a yield signal on the control connection
Object result = readControl();
if(!result.equals(yieldAtom))
{
throw new IOException("Remote protocol error.");
}
}
/**
* This buffer is used to store FromEclipseQueue data flushed by eclipse
* through the socket after an ec_flushio signal. The buffer is initialised
* with a DataInputStream (wrapped around the Socket's input stream). When
* instructed, it can read a certain number of bytes from this stream into
* the buffer, using the readBytesFromSocket method.
*
* These bytes can be read back from the socket using the readByte or
* readBytes methods. Any amount can be read at a time as long as it does
* not exceed the amount available.
*
*/
private class FromEclipseQueueBuffer
{
// number of available bytes held in the buffer
private int available;
// DataInputStream where the bytes come from. We use a DataInputStream
// because it has the readFully method which blocks until either the
// specified number of bytes have been read, the end-of-file character has
// been reached, or there is an IOException
private DataInputStream socketInputStream;
// A queue storing chunks of bytes. Each chunk is the result of a call to
// readBytesFromSocket.
private Vector byteChunkVector;
// The byte chunk in the buffer from where the first byte should be read
private byte[] currentByteChunk;
// The number of bytes which have already been read from the first byte
// chunk
private int readFromCurrentByteChunk;
// constructor
FromEclipseQueueBuffer(InputStream socketInputStream)
{
// initialise instance variables
this.socketInputStream = new DataInputStream(socketInputStream);
available = 0;
byteChunkVector = new Vector();
readFromCurrentByteChunk = 0;
currentByteChunk = null;
}
// return the number of unread bytes stored in the buffer (some read bytes
// will be stored in the currentByteChunk until it is exhausted).
int available()
{
return(available);
}
// method to move currentByteChunk on to the next member of the queue.
// If the queue is non-empty, the first member is popped and becomes the
// current byte chunk. Otherwise the currentByteChunk is set to null.
private void nextChunk()
{
if(byteChunkVector.size() > 0)
{
currentByteChunk = (byte []) byteChunkVector.remove(0);
}
else
{
currentByteChunk = null;
}
// we read from the beginning of the new currentByteChunk
readFromCurrentByteChunk = 0;
}
// Method to read a byte from the buffer. Returns -1 if the buffer is empty
int readByte()
{
byte signed_byte;
if(available == 0)
{
return(-1);
}
// if we have reached the end of the current chunk, move on to the next.
// Since available > 0, next chunk cannot be null
if(readFromCurrentByteChunk == currentByteChunk.length)
{
nextChunk();
}
// One fewer bytes is available
available--;
// return the array element from the current byte chunk
signed_byte = currentByteChunk[readFromCurrentByteChunk++];
// in the byte chunk it is interpreted as between -128 and 127,
// here we want a number between 0 and 255: if it is less than 0, add 256
if(signed_byte < 0)
{
return(signed_byte+256);
}
else
{
return(signed_byte);
}
}
// read