1// BEGIN LICENSE BLOCK
2// Version: CMPL 1.1
3//
4// The contents of this file are subject to the Cisco-style Mozilla Public
5// License Version 1.1 (the "License"); you may not use this file except
6// in compliance with the License.  You may obtain a copy of the License
7// at www.eclipse-clp.org/license.
8//
9// Software distributed under the License is distributed on an "AS IS"
10// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied.  See
11// the License for the specific language governing rights and limitations
12// under the License.
13//
14// The Original Code is  The ECLiPSe Constraint Logic Programming System.
15// The Initial Developer of the Original Code is  Cisco Systems, Inc.
16// Portions created by the Initial Developer are
17// Copyright (C) 2001 - 2006 Cisco Systems, Inc.  All Rights Reserved.
18//
19// Contributor(s): Josh Singer, Parc Technologies
20//
21// END LICENSE BLOCK
22//Title:        Java/ECLiPSe interface
23//Version:      $Id: RemoteEclipse.java,v 1.1 2006/09/23 01:54:12 snovello Exp $
24//Author:       Josh Singer
25//Company:      Parc Technologies
26//Description:  Connection to a remote ECLiPSe.
27package com.parctechnologies.eclipse;
28import java.io.*;
29import java.net.*;
30import java.util.*;
31
32
33
34/**
35 * Remote connection to an existing ECLiPSe process. The connection may be made
36 * over a TCP/IP network to an ECLiPSe which has been primed using the
37 * <code>remote_connect/3</code> or the <code>remote_connect_setup/3</code>
38 * builtin predicates. As well as the functionality provided by the
39 * <i>EclipseConnection</i> interface, <i>RemoteEclipse</i> also allows for
40 * execution control to be transferred explicity over to ECLiPSe with the
41 * <code>resume()</code> method.
42 *
43 * <p>The connection is terminated from the Java side using either the
44 * <code>disconnect()</code> method (when Java has execution control) or
45 * <code>unilateralDisconnect()</code> (when ECLiPSe has execution control).
46 * If the builtin predicate <code>remote_disconnect/1</code> is executed on
47 * the ECLiPSe side, the effect on the Java side are similar to the
48 * effects of <code>disconnect()</code>.
49 *
50 */
51public class RemoteEclipse extends EclipseConnectionImpl implements EclipseConnection
52{
53  /**
54   * Implementation overview
55   *
56   * I assume familiarity with the Eclips remote interface protocol.
57   *
58   * The main complication to the implementation of this class is the fact that
59   * ECLiPSe sockets have a very small fixed buffer size. Consequences:
60   * (1) When a client writes to an Eclipse socket and the socket's buffer
61   * reaches capacity, the client's write/flush command blocks indefinitely
62   * unless Eclipse is reading from the socket's stream. Because of this block,
63   * any read call which the user may set up using rpc is never reached.
64   *
65   * (2) Conversely, when Eclipse flushes an amount of data through the socket
66   * stream which is larger than capacity, its own write command blocks
67   * indefinitely unless there is a corresponding read command waiting on the
68   * Java side. However, the user cannot set up this read after Eclipse flushes,
69   * because control does not return to Java until after the write command
70   * completes.
71   *
72   * The way we handle (1) is to enclose the socket OutputStream in a special
73   * NonBlockingOutputStream, whose write and flush methods are non-blocking
74   * A thread is hidden within each NonBlockingOutputStream object. The thread
75   * is activated whenever the NonBlockingOutputStream is flushed and handles
76   * the write and flush calls to the underlying stream (the socket in this
77   * case) concurrently. The hidden thread may block, but the original thread
78   * is unblocked. It can therefore go ahead and set up a read call on the
79   * Eclipse side, which will remove the blockage, and read the data written by
80   * the hidden stream.
81   *
82   * (2) is handled using a buffer on the Java side. Before attempting to write
83   * the data, Eclipse sends an ec_flushio signal, indicating the amount about
84   * to be written. This amount of data is immediately read into a buffer.
85   * From there it can be read by the user later. See the
86   * FromEclipseQueueBuffer class for more details.
87   *
88   */
89
90
91
92  // The socket for sending/receiving control signals
93  private Socket control;
94  // The socket for sending/receiving rpc goals
95  private Socket rpc;
96  // The name of the Eclipse remote connection which this object is connected
97  // to.
98  private String connectionName;
99  // Stream for receiving control signals in EXDR format
100  private EXDRInputStream controlEXDRInput;
101  // Stream for receiving rpc goals in EXDR format
102  private EXDRInputStream rpcEXDRInput;
103  // Stream for sending control signals in EXDR format
104  private EXDROutputStream controlEXDROutput;
105  // Stream for sending rpc goals in EXDR format
106  private EXDROutputStream rpcEXDROutput;
107  // common atoms
108  private static final Atom resumeAtom = new Atom("resume");
109  private static final Atom rpcAtom = new Atom("rpc");
110  private static final Atom yieldAtom = new Atom("yield");
111  private static final Atom disconnectAtom = new Atom("disconnect");
112  private static final Atom disconnectYieldAtom = new Atom("disconnect_yield");
113  private static final Atom disconnectResumeAtom = new Atom("disconnect_resume");
114  private static final Atom syncAtom = new Atom("sync");
115  private static final Atom asyncAtom = new Atom("async");
116  private static final Atom fromecAtom = new Atom("fromec");
117  private static final Atom toecAtom = new Atom("toec");
118  private static final Atom bidirectAtom = new Atom("bidirect");
119  private static final Atom emptyAtom = new Atom("");
120  private static final Atom failAtom = new Atom("fail");
121  private static final Atom successAtom = new Atom("success");
122
123  private Map queueInfo = new HashMap();
124
125  // The address of the machine which Eclipse is running on, as read during the
126  // protocol. This should be used in subsequent client socket connections.
127  private InetAddress hostAddress;
128
129  private static final int PROTOCOL_VERSION_NUMBER = 1;
130
131  public static final int DEFAULT_TIMEOUT_MILLIS = 30000;
132
133  /**
134   * Make a connection to an existing ECLiPSe process. The ECLiPSe process must
135   * be on a server machine which is reachable on a TCP/IP network. ECLiPSe must be
136   * primed using <code>remote_connect_setup/3</code>. The builtin predicate
137   * <code>remote_connect_accept/6</code> should then be used to complete the
138   * connection. The connection details
139   * (IP address, port number, password) are specified as parameters and must match those
140   * specified/returned as arguments in the execution of
141   * <code>remote_connect_setup/3</code> and <code>remote_connect_accept/6</code>.
142   *
143   * @throws IOException if the connection could not be made, or times out
144   * within <code>DEFAULT_TIMEOUT_MILLIS</code> milliseconds.
145   */
146  public RemoteEclipse(InetAddress remoteEclipseHost, int remoteEclipsePort,
147                       String passwd)
148    throws IOException
149  {
150    this(remoteEclipseHost, remoteEclipsePort, passwd, DEFAULT_TIMEOUT_MILLIS);
151  }
152
153  /**
154   * Make a connection to an existing ECLiPSe process. The ECLiPSe process must be
155   *
156   * on a server machine which is reachable on a TCP/IP network. ECLiPSe must be
157   * primed using <code>remote_connect/3</code>. The connection details
158   * (IP address, port number) are specified as parameters and must match those
159   * specified/returned as arguments in the execution of
160   * <code>remote_connect/3</code>. If <code>remote_connect_setup/3</code>. was
161   * used to prime ECLiPSe for the remote connection, this constructor will fail
162   * as it does not use a password.
163   *
164   * @throws IOException if the connection could not be made, or times out
165   * within <code>DEFAULT_TIMEOUT_MILLIS</code> milliseconds.
166   */
167  public RemoteEclipse(InetAddress remoteEclipseHost, int remoteEclipsePort)
168    throws IOException
169  {
170    this(remoteEclipseHost, remoteEclipsePort, "", DEFAULT_TIMEOUT_MILLIS);
171  }
172
173  /**
174   * Make a connection to an existing ECLiPSe process. The ECLiPSe process must
175   * be on a server machine which is reachable on a TCP/IP network. ECLiPSe must be
176   * primed using <code>remote_connect/3</code>. The connection details
177   * (IP address, port number) are specified as parameters and must match those
178   * specified/returned as arguments in the execution of
179   * <code>remote_connect/3</code>. If <code>remote_connect_setup/3</code>. was
180   * used to prime ECLiPSe for the remote connection, this constructor will fail
181   * as it does not use a password.
182   *
183   * @param timeoutMillis number of milliseconds to wait for the initial
184   * connection to be established before throwing an exception. Set
185   * <code>timeoutMillis</code> to 0 to wait indefinitely for the connection.
186   *
187   * @throws IOException if the connection could not be made, or times out
188   * within <code>timeoutMillis</code> milliseconds.
189   */
190  public RemoteEclipse(InetAddress remoteEclipseHost, int remoteEclipsePort,
191                       int timeoutMillis)
192    throws IOException
193  {
194    this(remoteEclipseHost, remoteEclipsePort, "", timeoutMillis);
195  }
196
197  private void setUpControl(InetAddress remoteEclipseHost,
198                            int remoteEclipsePort)
199                            throws IOException
200  {
201    control = new Socket(remoteEclipseHost, remoteEclipsePort);
202    // initialise the EXDR readers and writers on the control socket
203    controlEXDRInput = new EXDRInputStream(control.getInputStream());
204    controlEXDROutput = new EXDROutputStream(control.getOutputStream());
205  }
206
207  private void setUpRPC(InetAddress remoteEclipseHost, int remoteEclipsePort)
208                        throws IOException
209  {
210    rpc = new Socket(remoteEclipseHost, remoteEclipsePort);
211    // Initialise readers and writers on these sockets
212    rpcEXDRInput = new EXDRInputStream(rpc.getInputStream());
213    rpcEXDROutput = new EXDROutputStream(rpc.getOutputStream());
214  }
215
216
217
218  /**
219   * Make a connection to an existing ECLiPSe process. The ECLiPSe process must
220   * be on a server machine which is reachable on a TCP/IP network. ECLiPSe must be
221   * primed using <code>remote_connect_setup/3</code>. The builtin predicate
222   * <code>remote_connect_accept/6</code> should then be used to complete the
223   * connection. The connection details
224   * (IP address, port number, password) are specified as parameters and must match those
225   * specified/returned as arguments in the execution of
226   * <code>remote_connect_setup/3</code> and <code>remote_connect_accept/6</code>.
227   *
228   * @param timeoutMillis number of milliseconds to wait for the initial
229   * connection to be established before throwing an exception. Set
230   * <code>timeoutMillis</code> to 0 to wait indefinitely for the connection.
231   *
232   * @throws IOException if the connection could not be made, or times out
233   * within <code>timeoutMillis</code> milliseconds.
234   */
235  public RemoteEclipse(InetAddress remoteEclipseHost, int remoteEclipsePort,
236                       String passwd, int timeoutMillis)
237    throws IOException
238  {
239    hostAddress = remoteEclipseHost;
240
241    // System.out.println("starting remote protocol");
242
243    setUpControl(remoteEclipseHost, remoteEclipsePort);
244
245    CompoundTerm protocolTerm =
246      new CompoundTermImpl("remote_protocol",
247                           new Integer(PROTOCOL_VERSION_NUMBER));
248
249    //System.out.println("protocol term is "+protocolTerm);
250
251    writeControl(protocolTerm);
252
253    Object presponse = readControl();
254    //System.out.println("presponse is "+presponse);
255
256    if(!presponse.equals("yes"))
257    {
258      //System.out.println("presponse not equal to \"yes\", closing control");
259      control.close();
260      throw(new IOException("Remote protocol error: protocol version unsupported."));
261    }
262
263    //System.out.println("presponse equal to \"yes\", continuing protocol");
264
265    writeControl(passwd);
266
267    // read the connectionName on the control socket
268    connectionName = ((Atom) readControlTimeout(timeoutMillis)).functor();
269
270    // write the language name on the control socket
271    writeControl("java");
272    // System.out.println("Read connection name: "+connectionName);
273    setUpRPC(remoteEclipseHost, remoteEclipsePort);
274
275    if(!connectionName.equals(((Atom) readRPCTimeout(timeoutMillis)).functor()))
276    {
277      throw(new IOException("Remote protocol error."));
278    }
279    // set up the peer name locally
280    setPeerName(new Atom(connectionName));
281
282    // System.out.println("remote protocol complete");
283  }
284
285
286  /**
287   * Terminate the connection with ECLiPSe unilaterally. This method should be
288   * invoked in unforseen circumstances when the connection should be terminated
289   * while ECLiPSe has execution control. After
290   * <code>unilateralDisconnect</code> has
291   * been invoked, public methods invoked on this <i>RemoteEclipse</i> will
292   * throw <i>EclipseTerminatedException</i>s.
293   *
294   * @throws EclipseTerminatedException if the connection has already been
295   * terminated.
296   */
297  public void unilateralDisconnect() throws EclipseTerminatedException
298  {
299    shouldDisconnect = false; // Avoid recursive calls to unilateralDisconnect
300    testTerminated();
301    try
302    {
303      // don't use writeControl here as that might cause a recursive invocation
304      // of unilateralDisconnect.
305      controlEXDROutput.write(disconnectResumeAtom);
306      controlEXDROutput.flush();
307    }
308    catch(Exception e) {}
309
310    try
311    {
312      terminateJavaSide();
313    }
314    catch(IOException ioe) {}
315
316  }
317
318  /**
319   * Terminate the remote connection to ECLiPSe. This should be invoked while
320   * Java has control. If ECLiPSe has control then use
321   * <code>unlateralDisconnect</code> instead. After <code>disconnect</code> has
322   * been invoked, public methods invoked on this <i>RemoteEclipse</i> will
323   * throw <i>EclipseTerminatedException</i>s.
324   *
325   * @throws EclipseTerminatedException if the connection has already been
326   * terminated.
327   * @throws IOException if there was a problem communicating with ECLiPSe
328   * during disconnection.
329   */
330  public synchronized void disconnect() throws IOException
331  {
332    testTerminated();
333    // write the disconnect atom on the control connection
334    writeControl(disconnectAtom);
335
336    // read "disconnect_yield" on the control connection
337    Object result = null;
338    try
339    {
340      result = readControl();
341    }
342    catch(IOException ioe)
343    {
344      // don't mind if the connection has already been lost
345    }
346
347    if(result != null && !result.equals(disconnectYieldAtom))
348    {
349      throw new IOException("Remote protocol error.");
350    }
351    terminateJavaSide();
352  }
353
354
355  private void terminateJavaSide() throws IOException
356  {
357    // set this object to terminated
358    terminated = true;
359
360    // close all user queues, but not the eclipse sides
361    closeAllQueues(false);
362
363    // try to close control and rpc sockets, but if this fails, don't worry.
364    try
365    {
366      control.close();
367    }
368    catch(Exception e){}
369    try
370    {
371      rpc.close();
372    }
373    catch(Exception e){}
374
375  }
376
377  /**
378   * Explicitly transfer execution control to ECLiPSe. ECLiPSe will resume
379   * execution immediately after the last goal which transferred control to
380   * Java (normally <code>remote_connect/3</code> or
381   * <code>remote_connect_setup/3</code>). This method should not be
382   * invoked while control has been transferred to Java using a
383   * <i>QueueListener</i>. An invocation of <code>resume()</code> should
384   * normally be paired with an execution of the builtin predicate
385   * <code>remote_yield/1</code>, which can return execution control to Java.
386   *
387   * @throws EclipseTerminatedException if the connection to ECLiPSe has been
388   * terminated.
389   * @throws IOException if there was a problem communicating with ECLiPSe.
390   */
391  public synchronized void resume() throws IOException
392  {
393    testTerminated();
394    waitForEclipse(true);
395  }
396
397  // called to perform setup code additional to what is done in
398  // EclipseConnectionImpl
399  void setupFromEclipseQueue(String name) throws EclipseException, IOException
400  {
401    // port to be used on eclipse side for socket
402    int port;
403    // Atoms for name and connectionName (used in rpc calls)
404    Atom nameAtom = new Atom(name);
405    // compound term returned by ECLiPSe during interaction
406    CompoundTerm result1 = null;
407    // write queue_create(QueueName, sync, fromec, '') on the control connection
408    writeControl(new CompoundTermImpl("queue_create", nameAtom, syncAtom,
409                                      fromecAtom, emptyAtom));
410
411    // read the next term from the control connection
412    try
413    {
414      result1 = (CompoundTerm) readControl();
415    }
416    catch(ClassCastException cce)
417    {
418      throw new IOException("Remote interface protocol error.");
419    }
420
421
422    if(result1.equals(yieldAtom))
423    {
424      throw new EclipseException("Could not create ECLiPSe side of queue.");
425    }
426
427    // System.out.println("result1 = "+result1);
428
429    // check that the response obeys the protocol
430    if(!result1.functor().equals("socket_client") ||
431       result1.arity() != 4 ||
432       !(result1.arg(1) instanceof Integer) ||
433       !(result1.arg(2) instanceof Atom) ||
434       !(result1.arg(3) instanceof Atom) ||
435       !(result1.arg(4) instanceof Atom) ||
436       !((Atom) result1.arg(2)).equals(nameAtom) ||
437       !((Atom) result1.arg(3)).functor().equals("sync") ||
438       !((Atom) result1.arg(4)).functor().equals("fromec"))
439    {
440      throw new IOException("Remote interface protocol error.");
441    }
442
443    // extract port number from the response
444    port = ((Integer) result1.arg(1)).intValue();
445
446    setupRemoteFromecQueue(nameAtom, port);
447
448    // send the resume message + wait for yield.
449    resume();
450  }
451
452  private void setupRemoteFromecQueue(Atom nameAtom, int port)
453    throws IOException
454  {
455    // result term received during interaction
456    CompoundTerm result2 = null;
457    // Socket to be used for the new queue
458    Socket newSocket;
459
460    // try to connect the new socket to the address specified during the remote
461    // protocol initialisation and the port specified above.
462    try
463    {
464      newSocket = new Socket(hostAddress, port);
465    }
466    catch(IOException e) // thrown if (for example), something else
467                               // has stolen the connection.
468    {
469      // inform ECLiPSe that socket creation failed
470      writeControl(new CompoundTermImpl("socket_connect", nameAtom, failAtom));
471      // throw the exception
472      throw e;
473    }
474    // otherwise inform ECLiPSe that socket creation succeeded
475    writeControl(new CompoundTermImpl("socket_connect", nameAtom, successAtom));
476
477    // read the next term from the control connection
478    try
479    {
480      result2 = (CompoundTerm) readControl();
481    }
482    catch(ClassCastException cce)
483    {
484      throw new IOException("Remote interface protocol error.");
485    }
486
487    // check that this obeys the protocol
488    if(!result2.functor().equals("socket_accept") ||
489       result2.arity() != 2 ||
490       !(result2.arg(1) instanceof Atom) ||
491       !(result2.arg(2) instanceof Integer) ||
492       !((Atom) result2.arg(1)).equals(nameAtom))
493    {
494      throw new IOException("Remote interface protocol error.");
495    }
496    // extract the stream number id of the named queue
497    Integer Id = (Integer) (result2.arg(2));
498    int id = Id.intValue();
499
500    setupFromecInfo(id, newSocket);
501  }
502
503
504
505  // called to perform setup code additional to what is done in
506  // EclipseConnectionImpl
507  void setupToEclipseQueue(String name) throws EclipseException, IOException
508  {
509    // port to be used on eclipse side for socket
510    int port;
511    // Atoms for name and connectionName (used in rpc calls)
512    Atom nameAtom = new Atom(name);
513    // compound term returned by ECLiPSe during interaction
514    CompoundTerm result1 = null;
515    // write queue_create(QueueName, sync, fromec, '') on the control connection
516    writeControl(new CompoundTermImpl("queue_create", nameAtom, syncAtom,
517                                      toecAtom, emptyAtom));
518
519    // read the next term from the control connection
520    try
521    {
522      result1 = (CompoundTerm) readControl();
523    }
524    catch(ClassCastException cce)
525    {
526      throw new IOException("Remote interface protocol error.");
527    }
528
529
530    if(result1.equals(yieldAtom))
531    {
532      throw new EclipseException("Could not create ECLiPSe side of queue.");
533    }
534
535    // System.out.println("result1 = "+result1);
536
537    // check that the response obeys the protocol
538    if(!result1.functor().equals("socket_client") ||
539       result1.arity() != 4 ||
540       !(result1.arg(1) instanceof Integer) ||
541       !(result1.arg(2) instanceof Atom) ||
542       !(result1.arg(3) instanceof Atom) ||
543       !(result1.arg(4) instanceof Atom) ||
544       !((Atom) result1.arg(2)).equals(nameAtom) ||
545       !((Atom) result1.arg(3)).functor().equals("sync") ||
546       !((Atom) result1.arg(4)).functor().equals("toec"))
547    {
548      throw new IOException("Remote interface protocol error.");
549    }
550
551    // extract port number from the response
552    port = ((Integer) result1.arg(1)).intValue();
553
554    setupRemoteToecQueue(nameAtom, port);
555
556    // send the resume message + wait for yield
557    resume();
558  }
559
560
561  private void setupRemoteToecQueue(Atom nameAtom, int port)
562      throws IOException
563  {
564    // Socket to be used for the new queue
565    Socket newSocket;
566    // compound term returned by ECLiPSe during interaction
567    CompoundTerm result2 = null;
568    // try to connect the new socket to the address specified during the remote
569    // protocol initialisation and the port specified above.
570    try
571    {
572      newSocket = new Socket(hostAddress, port);
573    }
574    catch(IOException e) // thrown if (for example), something else
575                               // has stolen the connection.
576    {
577      // inform ECLiPSe that socket creation failed
578      writeControl(new CompoundTermImpl("socket_connect", nameAtom, failAtom));
579      // throw the exception
580      throw e;
581    }
582    // otherwise inform ECLiPSe that socket creation succeeded
583    writeControl(new CompoundTermImpl("socket_connect", nameAtom, successAtom));
584
585    // read the next term from the control connection
586    try
587    {
588      result2 = (CompoundTerm) readControl();
589    }
590    catch(ClassCastException cce)
591    {
592      throw new IOException("Remote interface protocol error.");
593    }
594
595    // check that this obeys the protocol
596    if(!result2.functor().equals("socket_accept") ||
597       result2.arity() != 2 ||
598       !(result2.arg(1) instanceof Atom) ||
599       !(result2.arg(2) instanceof Integer) ||
600       !((Atom) result2.arg(1)).equals(nameAtom))
601    {
602      throw new IOException("Remote interface protocol error.");
603    }
604    // extract the stream number id of the named queue
605    Integer Id = (Integer) (result2.arg(2));
606    int id = Id.intValue();
607
608    this.setupToecInfo(id, newSocket);
609  }
610
611//-------------------------------------------------------------------
612
613  // called to perform setup code additional to what is done in
614  // EclipseConnectionImpl
615  void setupAsyncEclipseQueue(String name) throws EclipseException, IOException
616  {
617    // port to be used on eclipse side for socket
618    int port;
619    // Atoms for name and connectionName (used in rpc calls)
620    Atom nameAtom = new Atom(name);
621    // compound term returned by ECLiPSe during interaction
622    CompoundTerm result1 = null;
623    // write queue_create(QueueName, sync, fromec, '') on the control connection
624    writeControl(new CompoundTermImpl("queue_create", nameAtom, asyncAtom,
625                                      bidirectAtom, emptyAtom));
626
627    // read the next term from the control connection
628    try
629    {
630      result1 = (CompoundTerm) readControl();
631    }
632    catch(ClassCastException cce)
633    {
634      throw new IOException("Remote interface protocol error.");
635    }
636
637
638    if(result1.equals(yieldAtom))
639    {
640      throw new EclipseException("Could not create ECLiPSe side of queue.");
641    }
642
643    // System.out.println("result1 = "+result1);
644
645    // check that the response obeys the protocol
646    if(!result1.functor().equals("socket_client") ||
647       result1.arity() != 4 ||
648       !(result1.arg(1) instanceof Integer) ||
649       !(result1.arg(2) instanceof Atom) ||
650       !(result1.arg(3) instanceof Atom) ||
651       !(result1.arg(4) instanceof Atom) ||
652       !((Atom) result1.arg(2)).equals(nameAtom) ||
653       !((Atom) result1.arg(3)).functor().equals("async") ||
654       !((Atom) result1.arg(4)).functor().equals("bidirect"))
655    {
656      throw new IOException("Remote interface protocol error.");
657    }
658
659    // extract port number from the response
660    port = ((Integer) result1.arg(1)).intValue();
661
662    setupRemoteAsyncecQueue(nameAtom, port);
663
664    // send the resume message + wait for yield
665    resume();
666  }
667
668
669  private void setupRemoteAsyncecQueue(Atom nameAtom, int port)
670      throws IOException
671  {
672    // Socket to be used for the new queue
673    Socket newSocket;
674    // compound term returned by ECLiPSe during interaction
675    CompoundTerm result2 = null;
676    // try to connect the new socket to the address specified during the remote
677    // protocol initialisation and the port specified above.
678    try
679    {
680      newSocket = new Socket(hostAddress, port);
681    }
682    catch(IOException e) // thrown if (for example), something else
683                               // has stolen the connection.
684    {
685      // inform ECLiPSe that socket creation failed
686      writeControl(new CompoundTermImpl("socket_connect", nameAtom, failAtom));
687      // throw the exception
688      throw e;
689    }
690    // otherwise inform ECLiPSe that socket creation succeeded
691    writeControl(new CompoundTermImpl("socket_connect", nameAtom, successAtom));
692
693    // read the next term from the control connection
694    try
695    {
696      result2 = (CompoundTerm) readControl();
697    }
698    catch(ClassCastException cce)
699    {
700      throw new IOException("Remote interface protocol error.");
701    }
702
703    // check that this obeys the protocol
704    if(!result2.functor().equals("socket_accept") ||
705       result2.arity() != 2 ||
706       !(result2.arg(1) instanceof Atom) ||
707       !(result2.arg(2) instanceof Integer) ||
708       !((Atom) result2.arg(1)).equals(nameAtom))
709    {
710      throw new IOException("Remote interface protocol error.");
711    }
712    // extract the stream number id of the named queue
713    Integer Id = (Integer) (result2.arg(2));
714    int id = Id.intValue();
715
716    setupAsyncecInfo(id, newSocket);
717  }
718//-------------------------------------------------------------------
719
720  boolean shouldDisconnect = false;
721
722  void testTerminated() throws EclipseTerminatedException
723  {
724    if(shouldDisconnect)
725    {
726      unilateralDisconnect();
727    }
728    super.testTerminated();
729  }
730
731  // Called if the flush() method of a ToEclipseQueue is called.
732  synchronized void flushStream(int streamID) throws IOException
733  {
734    // write rem_flushio(Queuenumber, Bytes) on control connection & flush
735    writeControl(new CompoundTermImpl("rem_flushio",
736                                      new Integer(streamID),
737                                      new Integer(getBytesBuffered(streamID))));
738    // get the buffer to flush (in the background) to its destination
739    getOutputStream(streamID).flush();
740    // Now these bytes have been flushed, reset the byte counter
741    setBytesBuffered(streamID, 0);
742    // hand control to Eclipse and handle any resulting events until yield
743    waitForEclipse(false);
744  }
745
746
747  synchronized void writeByteToStream(int streamid, byte b) throws IOException
748  {
749    getOutputStream(streamid).write(0xff & b);
750    setBytesBuffered(streamid, getBytesBuffered(streamid)+1);
751  }
752
753
754  synchronized int writeToStream(int streamid, byte[] b, int off, int len) throws IOException
755  {
756    getOutputStream(streamid).write(b, off, len);
757
758    // since the above statement cannot have written less than len bytes unless it
759    // threw an exception, if execution reaches the current point, we know len
760    // bytes have been written.
761
762    setBytesBuffered(streamid, getBytesBuffered(streamid)+len);
763    return(len);
764  }
765
766  // Called if the available() method of a FromEclipseQueue is called.
767  synchronized int availableOnStream(int streamID)
768  {
769    return(this.availableInBuffer(streamID));
770  }
771
772  // called by the read method in FromEclipseQueue. Returns -1 if there are no
773  // bytes in the buffer.
774  synchronized int readByteFromStream(int streamID) throws IOException
775  {
776    return(readByteFromBuffer(streamID));
777  }
778
779  // called by the read methods of FromEclipseQueue. Returns the number of
780  // bytes read (may be less than len).
781  synchronized int readFromStream(int streamid, int off, int len, byte[] b) throws IOException
782  {
783    return(readBytesFromBuffer(streamid, b, off, len));
784  }
785
786
787  ControlSignal getNextControlSignal(boolean isFirstIteration,
788                                     boolean transferControlWithResume)
789    throws IOException
790  {
791    if(transferControlWithResume || !isFirstIteration)
792    {
793      writeControl(resumeAtom);
794    }
795
796    CompoundTerm nextControlTerm = getNextControlTerm();
797
798    if(signalsYield(nextControlTerm))
799    {
800      return(new YieldSignal());
801    }
802
803    if(signalsMultilateralDisconnect(nextControlTerm))
804    {
805      return(new MultilateralDisconnectSignal());
806    }
807
808    if(signalsUnilateralDisconnect(nextControlTerm))
809    {
810      return(new UnilateralDisconnectSignal());
811    }
812
813    if(signalsFlushIO(nextControlTerm))
814    {
815      return(new FlushIOSignal((Integer) nextControlTerm.arg(1),
816                               (Integer) nextControlTerm.arg(2)));
817    }
818
819    if(signalsWaitIO(nextControlTerm))
820    {
821      return(new WaitIOSignal((Integer) nextControlTerm.arg(1)));
822    }
823
824    if(signalsCloseQueue(nextControlTerm))
825    {
826      return(new CloseQueueSignal((Integer) nextControlTerm.arg(1)));
827    }
828
829    if(signalsOpenQueue(nextControlTerm))
830    {
831      return(new RemoteOpenQueueSignal((Integer) nextControlTerm.arg(1),
832                                       (Atom) nextControlTerm.arg(2),
833                                       (Atom) nextControlTerm.arg(3),
834                                       (Atom) nextControlTerm.arg(4)));
835    }
836
837    // default, signifies unrecognised signal
838    return(null);
839  }
840
841
842  /**
843   * Send an RPC goal to ECLiPSe.
844   */
845  void sendGoal(Object goal) throws IOException
846  {
847    writeControl(rpcAtom);
848    writeRPC(goal);
849    // System.out.println("Sent goal on rpc connection: "+goal);
850  }
851
852  /**
853   * Receive an RPC goal from ECLiPSe.
854   */
855  Object receiveGoal() throws IOException
856  {
857    return(rpcEXDRInput.readTerm());
858  }
859
860  void closeFromEclipseStreamJavaSide(int streamid) throws IOException
861  {
862    super.closeFromEclipseStreamJavaSide(streamid);
863    closeFromecSocket(streamid);
864    removeInfo(streamid);
865  }
866
867  void closeToEclipseStreamJavaSide(int streamid) throws IOException
868  {
869    super.closeToEclipseStreamJavaSide(streamid);
870    getOutputStream(streamid).close();
871    closeToecSocket(streamid);
872    setBytesBuffered(streamid, 0);
873  }
874
875  void closeFromEclipseStreamEclipseSide(int streamid) throws IOException
876  {
877    super.closeFromEclipseStreamEclipseSide(streamid);
878    // write queue_close(streamid) on the control connection
879    writeControl(new CompoundTermImpl("queue_close",
880                 new Integer(streamid)));
881    // wait for a yield signal on the control connection
882    Object result = readControl();
883    if(!result.equals(yieldAtom))
884    {
885      throw new IOException("Remote protocol error.");
886    }
887  }
888
889  void closeToEclipseStreamEclipseSide(int streamid) throws IOException
890  {
891    super.closeToEclipseStreamEclipseSide(streamid);
892    // write queue_close(streamid) on the control connection
893    writeControl(new CompoundTermImpl("queue_close",
894                 new Integer(streamid)));
895    // wait for a yield signal on the control connection
896    Object result = readControl();
897    if(!result.equals(yieldAtom))
898    {
899      throw new IOException("Remote protocol error.");
900    }
901
902  }
903
904  void closeAsyncEclipseStreamEclipseSide(int streamid) throws IOException
905  {
906    super.closeAsyncEclipseStreamEclipseSide(streamid);
907    // write queue_close(streamid) on the control connection
908    writeControl(new CompoundTermImpl("queue_close",
909                 new Integer(streamid)));
910    // wait for a yield signal on the control connection
911    Object result = readControl();
912    if(!result.equals(yieldAtom))
913    {
914      throw new IOException("Remote protocol error.");
915    }
916
917  }
918
919  /**
920   * This buffer is used to store FromEclipseQueue data flushed by eclipse
921   * through the socket after an ec_flushio signal. The buffer is initialised
922   * with a DataInputStream (wrapped around the Socket's input stream). When
923   * instructed, it can read a certain number of bytes from this stream into
924   * the buffer, using the readBytesFromSocket method.
925   *
926   * These bytes can be read back from the socket using the readByte or
927   * readBytes methods. Any amount can be read at a time as long as it does
928   * not exceed the amount available.
929   *
930   */
931  private class FromEclipseQueueBuffer
932  {
933    // number of available bytes held in the buffer
934    private int available;
935    // DataInputStream where the bytes come from. We use a DataInputStream
936    // because it has the readFully method which blocks until either the
937    // specified number of bytes have been read, the end-of-file character has
938    // been reached, or there is an IOException
939    private DataInputStream socketInputStream;
940    // A queue storing chunks of bytes. Each chunk is the result of a call to
941    // readBytesFromSocket.
942    private Vector byteChunkVector;
943    // The byte chunk in the buffer from where the first byte should be read
944    private byte[] currentByteChunk;
945    // The number of bytes which have already been read from the first byte
946    // chunk
947    private int readFromCurrentByteChunk;
948
949    // constructor
950    FromEclipseQueueBuffer(InputStream socketInputStream)
951    {
952      // initialise instance variables
953      this.socketInputStream = new DataInputStream(socketInputStream);
954      available = 0;
955      byteChunkVector = new Vector();
956      readFromCurrentByteChunk = 0;
957      currentByteChunk = null;
958    }
959    // return the number of unread bytes stored in the buffer (some read bytes
960    // will be stored in the currentByteChunk until it is exhausted).
961    int available()
962    {
963      return(available);
964    }
965    // method to move currentByteChunk on to the next member of the queue.
966    // If the queue is non-empty, the first member is popped and becomes the
967    // current byte chunk. Otherwise the currentByteChunk is set to null.
968    private void nextChunk()
969    {
970      if(byteChunkVector.size() > 0)
971      {
972        currentByteChunk = (byte []) byteChunkVector.remove(0);
973      }
974      else
975      {
976        currentByteChunk = null;
977      }
978      // we read from the beginning of the new currentByteChunk
979      readFromCurrentByteChunk = 0;
980    }
981
982    // Method to read a byte from the buffer. Returns -1 if the buffer is empty
983    int readByte()
984    {
985      byte signed_byte;
986
987      if(available == 0)
988      {
989        return(-1);
990      }
991      // if we have reached the end of the current chunk, move on to the next.
992      // Since available > 0, next chunk cannot be null
993      if(readFromCurrentByteChunk == currentByteChunk.length)
994      {
995        nextChunk();
996      }
997      // One fewer bytes is available
998      available--;
999      // return the array element from the current byte chunk
1000      signed_byte = currentByteChunk[readFromCurrentByteChunk++];
1001      // in the byte chunk it is interpreted as between -128 and 127,
1002      // here we want a number between 0 and 255: if it is less than 0, add 256
1003      if(signed_byte < 0)
1004      {
1005        return(signed_byte+256);
1006      }
1007      else
1008      {
1009        return(signed_byte);
1010      }
1011    }
1012
1013    // read <len> bytes from the buffer into array b, at offset <off>.
1014    // returns the number of bytes copied.
1015    int readBytes(byte[] b, int off, int len)
1016    {
1017      // the number of bytes copied into b, the number of unread bytes in the
1018      // current byte chunk and the number of bytes still remaining to be
1019      // copied into b
1020      int bytesCopied = 0, availableInCurrentChunk, bytesRequired;
1021      // while we have not copied enough bytes and there are still bytes to
1022      // copy...
1023      while(bytesCopied < len && available > 0)
1024      {
1025        // calculate no. of remaining bytes in current chunk
1026        availableInCurrentChunk =
1027          currentByteChunk.length - readFromCurrentByteChunk;
1028        // calculate no. of bytes still to be copied
1029        bytesRequired = len - bytesCopied;
1030        // if we can't get all remaining bytes from the current chunk
1031        if(availableInCurrentChunk < bytesRequired)
1032        {
1033          // copy all remaining bytes from the current chunk into b
1034          System.arraycopy(currentByteChunk, readFromCurrentByteChunk,
1035                           b, off + bytesCopied, availableInCurrentChunk);
1036          // update available and bytesCopied accordingly
1037          available -= availableInCurrentChunk;
1038          bytesCopied += availableInCurrentChunk;
1039          // move on to the next chunk
1040          nextChunk();
1041        }
1042        else // If we can get all remaining bytes from the current chunk into b
1043        {
1044          // copy the required bytes
1045          System.arraycopy(currentByteChunk, readFromCurrentByteChunk,
1046                           b, off + bytesCopied, bytesRequired);
1047          // update available, bytesCopied and readFromCurrentByteChunk
1048          // accordingly
1049          available -= bytesRequired;
1050          bytesCopied += bytesRequired;
1051          readFromCurrentByteChunk += bytesRequired;
1052        }
1053      }
1054      // return the number of bytes copied
1055      return(bytesCopied);
1056    }
1057
1058    // read nBytes bytes from the input stream into the buffer. Block until this
1059    // is complete or it becomes impossible due to an exception.
1060    void readBytesFromSocket(int nBytes) throws IOException
1061    {
1062      // don't add an empty chunk if nBytes is 0
1063      if(nBytes == 0) return;
1064      // initialise a byte array for the new chunk
1065      byte[] newChunk = new byte[nBytes];
1066      // readFully the new chunk from the stream (blocks until complete or
1067      // throws exception)
1068      socketInputStream.readFully(newChunk);
1069      // add the new chunk on the end of the byteChunkVector queue.
1070      byteChunkVector.add(newChunk);
1071      // if the currentByteChunk was null because all the bytes were exhausted
1072      if(currentByteChunk == null)
1073      {
1074        // move it on to the first chunk
1075        nextChunk();
1076      }
1077      // update available
1078      available += nBytes;
1079    }
1080  }
1081
1082  private void writeControl(Object message) throws IOException
1083  {
1084    try
1085    {
1086      controlEXDROutput.write(message);
1087      controlEXDROutput.flush();
1088    }
1089    catch(SocketException e)
1090    {
1091      unilateralDisconnect();
1092      throw(new EclipseTerminatedException());
1093    }
1094  }
1095
1096  private void writeRPC(Object message) throws IOException
1097  {
1098    try
1099    {
1100      rpcEXDROutput.write(message);
1101      rpcEXDROutput.flush();
1102    }
1103    catch(SocketException e)
1104    {
1105      unilateralDisconnect();
1106      throw(new EclipseTerminatedException());
1107    }
1108  }
1109
1110  private Object readControl() throws IOException
1111  {
1112    try
1113    {
1114      return(controlEXDRInput.readTerm());
1115    }
1116    catch(EOFException e)
1117    {
1118      unilateralDisconnect();
1119      throw(new EclipseTerminatedException());
1120    }
1121  }
1122
1123  private Object readControlTimeout(int timeoutMillis) throws IOException
1124  {
1125    int old_timeout;
1126    old_timeout = control.getSoTimeout();
1127    try
1128    {
1129      control.setSoTimeout(timeoutMillis);
1130      return(readControl());
1131    }
1132    finally
1133    {
1134      control.setSoTimeout(old_timeout);
1135    }
1136  }
1137
1138
1139  private Object readRPCTimeout(int timeoutMillis) throws IOException
1140  {
1141    int old_timeout;
1142    old_timeout = rpc.getSoTimeout();
1143    try
1144    {
1145      rpc.setSoTimeout(timeoutMillis);
1146      return(readRPC());
1147    }
1148    finally
1149    {
1150      rpc.setSoTimeout(old_timeout);
1151    }
1152  }
1153
1154
1155
1156  private Object readRPC() throws IOException
1157  {
1158    try
1159    {
1160      return(rpcEXDRInput.readTerm());
1161    }
1162    catch(EOFException e)
1163    {
1164      unilateralDisconnect();
1165      throw(new EclipseTerminatedException());
1166    }
1167
1168  }
1169
1170
1171  /**
1172   * Finalizer method called when object is to be garbage collected
1173   */
1174  protected void finalize() throws IOException, EclipseException
1175  {
1176    this.unilateralDisconnect();
1177  }
1178
1179
1180  private CompoundTerm getNextControlTerm() throws IOException
1181  {
1182    Object nextControlObj = readControl();
1183
1184    if(!(nextControlObj instanceof CompoundTerm))
1185    {
1186      throw(new IOException("Remote interface protocol error: control object not CompoundTerm"));
1187    }
1188    return((CompoundTerm) nextControlObj);
1189
1190  }
1191
1192
1193  private boolean signalsYield(CompoundTerm controlTerm)
1194  {
1195    return(controlTerm.equals(yieldAtom));
1196  }
1197  private boolean signalsMultilateralDisconnect(CompoundTerm controlTerm)
1198  {
1199    return(controlTerm.equals(disconnectAtom));
1200  }
1201  private boolean signalsUnilateralDisconnect(CompoundTerm controlTerm)
1202  {
1203    return(controlTerm.equals(disconnectYieldAtom));
1204  }
1205  private boolean signalsFlushIO(CompoundTerm controlTerm)
1206  {
1207    return(controlTerm.functor().equals("ec_flushio") &&
1208           controlTerm.arity() == 2 &&
1209           controlTerm.arg(1) instanceof Integer &&
1210           controlTerm.arg(2) instanceof Integer);
1211  }
1212  private boolean signalsWaitIO(CompoundTerm controlTerm)
1213  {
1214    return(controlTerm.functor().equals("ec_waitio") &&
1215           controlTerm.arity() == 1 &&
1216           controlTerm.arg(1) instanceof Integer);
1217  }
1218  private boolean signalsCloseQueue(CompoundTerm controlTerm)
1219  {
1220    return(controlTerm.functor().equals("queue_close") &&
1221           controlTerm.arity() == 1 &&
1222           controlTerm.arg(1) instanceof Integer);
1223  }
1224  private boolean signalsOpenQueue(CompoundTerm controlTerm)
1225  {
1226    return(controlTerm.functor().equals("socket_client") &&
1227           controlTerm.arity() == 4 &&
1228           controlTerm.arg(1) instanceof Integer &&
1229           controlTerm.arg(2) instanceof Atom &&
1230           controlTerm.arg(3) instanceof Atom &&
1231           controlTerm.arg(4) instanceof Atom);
1232  }
1233
1234
1235  private void respondMultilateralDisconnect()
1236    throws IOException
1237  {
1238    //System.out.println("disconnection signal recieved from eclipse");
1239    // respond with the disconnect_resume acknowledgement
1240    writeControl(disconnectResumeAtom);
1241    //System.out.println("sent disconnect_resume message");
1242    // clean up java side
1243    terminateJavaSide();
1244    //System.out.println("completed java side of disconnection");
1245    throw(new EclipseTerminatedException());
1246  }
1247  private void respondUnilateralDisconnect()
1248    throws IOException
1249  {
1250    // clean up java side
1251    terminateJavaSide();
1252    //System.out.println("completed java side of disconnection");
1253    throw(new EclipseTerminatedException());
1254  }
1255  private void respondFlushIO(Integer streamID, Integer bytesFlushed)
1256    throws IOException
1257  {
1258    FromEclipseQueue feq;
1259    // look up the FromEclipseQueue based on the supplied stream number
1260    feq = lookupFromEclipseQueue(streamID.intValue());
1261    // if it is not there, print a message to stderr
1262    if (feq == null)
1263      System.err.println("ECLiPSe yielded after flushing stream "+streamID.intValue() +
1264                  " which is not registered as FromEclipseQueue.");
1265    else
1266    {
1267      bufferBytesFromSocket(streamID.intValue(), bytesFlushed.intValue());
1268      feq.notifyAvailable();
1269    }
1270
1271    // pass control back to eclipse
1272
1273  }
1274
1275  void respondWaitIO(Integer streamID) throws IOException
1276  {
1277    super.respondWaitIO(streamID);
1278    // waitIO must always be answered with at least one rem_flushio/yield signal
1279    // pair. In this case the number of bytes flushed is 0, in case nothing
1280    // has been written to the queue. If something has been written but not
1281    // flushed, this is the API user's problem. The ECLiPSe-side read should
1282    // just block.
1283    writeControl(new CompoundTermImpl("rem_flushio",
1284                                      streamID,
1285                                      new Integer (0)));
1286    // hand control to Eclipse and handle any resulting events until yield
1287    waitForEclipse(false);
1288
1289
1290  }
1291
1292  private void respondRemoteOpenQueue(Integer port, Atom nameAtom, Atom type,
1293                                      Atom direction)
1294    throws IOException
1295  {
1296
1297    if(type.equals(syncAtom))
1298    {
1299      if(direction.equals(toecAtom))
1300      {
1301	setupRemoteToecQueue(nameAtom, port.intValue());
1302	createToEclipseQueue(nameAtom.functor());
1303
1304	return;
1305      }
1306
1307      if(direction.equals(fromecAtom))
1308      {
1309	setupRemoteFromecQueue(nameAtom, port.intValue());
1310	createFromEclipseQueue(nameAtom.functor());
1311
1312	return;
1313      }
1314
1315      throw new IOException("Remote interface protocol error: queue direction not recognised.");
1316    }
1317    else if(type.equals(asyncAtom))
1318    {
1319      if(direction.equals(bidirectAtom))
1320      {
1321	setupRemoteAsyncecQueue(nameAtom, port.intValue());
1322	createAsyncEclipseQueue(nameAtom.functor());
1323
1324	return;
1325      }
1326      throw new IOException("Remote interface protocol error: queue direction not recognised.");
1327    }
1328
1329    throw new IOException("Remote interface protocol error: queue type not recognised.");
1330
1331  }
1332
1333  class MultilateralDisconnectSignal extends ControlSignal
1334  {
1335    void respond() throws IOException
1336    {
1337      respondMultilateralDisconnect();
1338    }
1339  }
1340
1341  class UnilateralDisconnectSignal extends ControlSignal
1342  {
1343    void respond() throws IOException
1344    {
1345      respondUnilateralDisconnect();
1346    }
1347  }
1348
1349  class FlushIOSignal extends ControlSignal
1350  {
1351    private Integer streamID, bytesFlushed;
1352
1353    FlushIOSignal(Integer streamID, Integer bytesFlushed)
1354    {
1355      this.streamID = streamID;
1356      this.bytesFlushed = bytesFlushed;
1357    }
1358
1359    void respond() throws IOException
1360    {
1361      respondFlushIO(streamID, bytesFlushed);
1362    }
1363  }
1364
1365  class CloseQueueSignal extends ControlSignal
1366  {
1367    private Integer streamID;
1368
1369    CloseQueueSignal(Integer streamID)
1370    {
1371      this.streamID = streamID;
1372    }
1373
1374    void respond() throws IOException
1375    {
1376      respondCloseQueue(streamID);
1377    }
1378  }
1379
1380  class RemoteOpenQueueSignal extends ControlSignal
1381  {
1382    private Integer port;
1383    private Atom nameAtom;
1384    private Atom type;
1385    private Atom direction;
1386
1387    RemoteOpenQueueSignal(Integer port, Atom nameAtom, Atom type, Atom direction)
1388    {
1389      this.port = port;
1390      this.nameAtom = nameAtom;
1391      this.type = type;
1392      this.direction = direction;
1393    }
1394
1395    void respond() throws IOException
1396    {
1397      respondRemoteOpenQueue(port, nameAtom, type, direction);
1398    }
1399  }
1400
1401  private FromEclipseQueueInfo getFromecInfo(int streamID)
1402  {
1403    return((FromEclipseQueueInfo) queueInfo.get(new Integer(streamID)));
1404  }
1405
1406  private void bufferBytesFromSocket(int streamID, int nBytes)
1407    throws IOException
1408  {
1409    try
1410    {
1411      getFromecInfo(streamID).getBuffer().readBytesFromSocket(nBytes);
1412    }
1413    catch(EOFException e)
1414    {
1415      unilateralDisconnect();
1416      throw new EclipseTerminatedException();
1417    }
1418  }
1419
1420  private void closeFromecSocket(int streamID) throws IOException
1421  {
1422    getFromecInfo(streamID).getSocket().close();
1423  }
1424
1425  private void removeInfo(int streamID)
1426  {
1427    queueInfo.remove(new Integer(streamID));
1428  }
1429
1430  private void setupFromecInfo(int streamID, Socket socket) throws IOException
1431  {
1432
1433    queueInfo.put(new Integer(streamID), new FromEclipseQueueInfo(socket));
1434  }
1435
1436  private int readByteFromBuffer(int streamID)
1437  {
1438    return(getFromecInfo(streamID).readByteFromBuffer());
1439  }
1440
1441  private int readBytesFromBuffer(int streamID, byte[] b, int off, int len)
1442  {
1443    return(getFromecInfo(streamID).readBytesFromBuffer(b, off, len));
1444  }
1445
1446  private int availableInBuffer(int streamID)
1447  {
1448    return(getFromecInfo(streamID).availableInBuffer());
1449  }
1450
1451  private AsyncEclipseQueueInfo getAsyncecInfo(int streamID)
1452  {
1453    return((AsyncEclipseQueueInfo) queueInfo.get(new Integer(streamID)));
1454  }
1455
1456  private void closeAsyncecSocket(int streamID) throws IOException
1457  {
1458    getAsyncecInfo(streamID).getSocket().close();
1459  }
1460
1461  private void setupAsyncecInfo(int streamID, Socket socket) throws IOException
1462  {
1463
1464    queueInfo.put(new Integer(streamID), new AsyncEclipseQueueInfo(socket));
1465  }
1466
1467  InputStream getAsyncInputStream(int streamID) throws IOException
1468  {
1469    return getAsyncecInfo(streamID).getInputStream();
1470  }
1471
1472  OutputStream getAsyncOutputStream(int streamID) throws IOException
1473  {
1474    return getAsyncecInfo(streamID).getOutputStream();
1475  }
1476
1477  void closeAsyncEclipseStreamJavaSide(int streamid) throws IOException
1478  {
1479    super.closeAsyncEclipseStreamJavaSide(streamid);
1480    closeAsyncecSocket(streamid);
1481  }
1482
1483  private class FromEclipseQueueInfo
1484  {
1485    private Socket socket;
1486    private FromEclipseQueueBuffer fromEclipseQueueBuffer;
1487    FromEclipseQueueInfo(Socket socket) throws IOException
1488    {
1489      this.socket = socket;
1490      this.fromEclipseQueueBuffer
1491        = new FromEclipseQueueBuffer(socket.getInputStream());
1492    }
1493
1494    FromEclipseQueueBuffer getBuffer()
1495    {
1496      return(fromEclipseQueueBuffer);
1497    }
1498
1499    Socket getSocket()
1500    {
1501      return(socket);
1502    }
1503
1504    int readByteFromBuffer()
1505    {
1506      return(fromEclipseQueueBuffer.readByte());
1507    }
1508    int readBytesFromBuffer(byte[] b, int off, int len)
1509    {
1510      return(fromEclipseQueueBuffer.readBytes(b, off, len));
1511    }
1512    int availableInBuffer()
1513    {
1514      return(fromEclipseQueueBuffer.available());
1515    }
1516
1517  }
1518
1519  private class AsyncEclipseQueueInfo
1520  {
1521    private Socket socket;
1522
1523    AsyncEclipseQueueInfo(Socket socket) throws IOException
1524    {
1525      this.socket = socket;
1526    }
1527
1528    Socket getSocket()
1529    {
1530      return(socket);
1531    }
1532
1533    InputStream getInputStream() throws IOException
1534    {
1535      return(socket.getInputStream());
1536    }
1537
1538    OutputStream getOutputStream() throws IOException
1539    {
1540      return(socket.getOutputStream());
1541    }
1542  }
1543
1544
1545
1546  private ToEclipseQueueInfo getToecInfo(int streamID)
1547  {
1548    return((ToEclipseQueueInfo) queueInfo.get(new Integer(streamID)));
1549  }
1550
1551  private void closeToecSocket(int streamID) throws IOException
1552  {
1553    getToecInfo(streamID).getSocket().close();
1554  }
1555
1556  private void setupToecInfo(int streamID, Socket socket) throws IOException
1557  {
1558
1559    queueInfo.put(new Integer(streamID), new ToEclipseQueueInfo(socket));
1560  }
1561
1562  private void setBytesBuffered(int streamID, int newVal)
1563  {
1564    getToecInfo(streamID).setBytesBuffered(newVal);
1565  }
1566
1567  private int getBytesBuffered(int streamID)
1568  {
1569    return(getToecInfo(streamID).getBytesBuffered());
1570  }
1571
1572  private OutputStream getOutputStream(int streamID)
1573  {
1574    return(getToecInfo(streamID).getOutputStream());
1575  }
1576
1577  private class ToEclipseQueueInfo
1578  {
1579    private Socket socket;
1580    private int bytesBuffered;
1581    private OutputStream outputStream;
1582
1583    void setBytesBuffered(int newValue)
1584    {
1585      bytesBuffered = newValue;
1586    }
1587    int getBytesBuffered()
1588    {
1589      return(bytesBuffered);
1590    }
1591    OutputStream getOutputStream()
1592    {
1593      return(outputStream);
1594    }
1595
1596    ToEclipseQueueInfo(Socket socket) throws IOException
1597    {
1598      this.socket = socket;
1599      this.outputStream = new NonBlockingOutputStream(socket.getOutputStream());
1600      bytesBuffered = 0;
1601    }
1602
1603    Socket getSocket()
1604    {
1605      return(socket);
1606    }
1607
1608  }
1609
1610  /**
1611   * An OutputStream whose write and flush methods do not block. This is useful
1612   * for when we would like to write large amounts of data to an output stream
1613   * but where that OutputStream's write/flush methods would normally block when
1614   * given such a large amount of input data.
1615   *
1616   * An instance is initialised by passing it a reference to the underlying
1617   * OutputStream.
1618   *
1619   * NonBlockingOutputStream's write methods store the bytes in a buffer and
1620   * therefore do not block.
1621   *
1622   * The flush method clears the buffer and puts its contents as a chunk of bytes
1623   * on a ByteChunkQueue. There is a "consumer" thread at the other end of the
1624   * queue, reading chunks of bytes and writing them to the underlying stream,
1625   * flushing it after each chunk.
1626   *
1627   * The write/flush methods will not raise IOExceptions by writing to the
1628   * buffer. However, if the copier thread was thrown an IOException when trying
1629   * to write to or flush the underlying stream, this IOException is stored,
1630   * and thrown at the next call of flush, write or close, with the text written
1631   * within an appropriate message. The close method may throw
1632   * IOExceptions either because an IOException from before was stored, or
1633   * because one was raised during its own operations.
1634   *
1635   *
1636   */
1637  private class NonBlockingOutputStream extends OutputStream
1638  {
1639    // The underlying stream where bytes are eventually written by the copier
1640    // thread
1641    private OutputStream underlying_stream;
1642
1643    // The byte array where bytes are initially written
1644    private ByteArrayOutputStream bufferStream;
1645
1646    // the copier thread (member class)
1647    private CopierThread copierThread;
1648
1649    // if an IOException was thrown during writing to the underlying stream,
1650    // it is stored using this reference
1651    private IOException thrownException;
1652
1653    // the queue on which chunks of bytes are queued, one chunk for each time the
1654    // flush method is called
1655    private ByteChunkQueue byteChunkQueue;
1656
1657    // constructor
1658    NonBlockingOutputStream(OutputStream underlying_stream)
1659    {
1660      // Initialise instance variables
1661      thrownException = null;
1662      this.underlying_stream = underlying_stream;
1663      bufferStream = new ByteArrayOutputStream();
1664      byteChunkQueue = new ByteChunkQueue();
1665      copierThread = new CopierThread();
1666      // Start the copier thread
1667      copierThread.start();
1668    }
1669
1670    // Method to throw the last IOException, wrapped in an appropriate message.
1671    // Also, resets the stored exception.
1672    private void throwLastIOException() throws IOException
1673    {
1674      if(thrownException != null)
1675      {
1676        throw(thrownException);
1677      }
1678    }
1679
1680    // The public write methods work as follows:
1681    // 1. they throw any left-over IOExceptions
1682    // 2. they write the data to the buffer
1683    public void write(int b) throws IOException
1684    {
1685      throwLastIOException();
1686      bufferStream.write(b);
1687    }
1688
1689    public void write(byte[] b) throws IOException
1690    {
1691      throwLastIOException();
1692      bufferStream.write(b);
1693    }
1694
1695    public void write(byte[] b, int off, int len) throws IOException
1696    {
1697      throwLastIOException();
1698      bufferStream.write(b, off, len);
1699    }
1700
1701    // returns the buffer contents as a byte array and clears the buffer.
1702    private byte[] getByteArrayAndReset()
1703    {
1704      byte[] bytes = bufferStream.toByteArray();
1705      bufferStream.reset();
1706      return(bytes);
1707    }
1708
1709    // reads a chunk from the front of the queue, writes it to the underlying
1710    // stream and flushes it. Any IOExceptions from these operations are stored
1711    // in thrownException
1712    private void copyAndFlush()
1713    {
1714      // Get the next chunk of bytes from the buffer.
1715      byte[] copy_chunk = byteChunkQueue.retrieveChunk();
1716
1717      try
1718      {
1719        // Try to write them to the buffer
1720        underlying_stream.write(copy_chunk);
1721
1722        // If this works, try to flush underlying stream
1723        underlying_stream.flush();
1724      }
1725      catch(IOException ioe)
1726      {
1727        // Store any thrown exception and make the next invocation of
1728        // testTerminated disconnect unilaterally.
1729        thrownException = ioe;
1730        shouldDisconnect = true;
1731      }
1732
1733    }
1734
1735    // throws any leftover exception.
1736    // clears the buffer and stores its bytes as a chunk on the end of the queue
1737    // it wakes up any threads waiting on the queue.
1738    public void flush() throws IOException
1739    {
1740      // throw any Exceptions if there are any
1741      throwLastIOException();
1742      // if there are any bytes in the buffer
1743      if(bufferStream.size() > 0)
1744        {
1745          // get the buffer as a byte chunk and reset it
1746          byte[] byteChunk = getByteArrayAndReset();
1747          // add the chunk to the queue
1748          byteChunkQueue.addChunk(byteChunk);
1749          // wake up the copier thread
1750          copierThread.wake();
1751        }
1752    }
1753
1754    // NOTE: does not wait until byte chunk queue is empty. Does wait until
1755    // copierThread is dead.
1756    public void close() throws IOException
1757    {
1758      // throw any stored exceptions
1759      throwLastIOException();
1760      // stop the copier thread
1761      copierThread.terminate();
1762
1763      copierThread.interrupt(); // in case copierthread waiting and has not been
1764                                // notified
1765
1766      // keep sleeping 1/4 of a second until copierThread is dead.
1767      while(copierThread.isAlive())
1768      {
1769        try
1770        {
1771          Thread.currentThread().sleep(250);
1772        }
1773        catch(InterruptedException ie)
1774        {
1775        }
1776      }
1777      // close the underlying stream
1778      underlying_stream.close();
1779    }
1780
1781    // Object to represent the queue of byte chunks
1782    private class ByteChunkQueue
1783    {
1784      // represent the queue using Vector
1785      Vector queue;
1786      // constructor
1787      ByteChunkQueue()
1788      {
1789        queue = new Vector();
1790      }
1791
1792      // synchronized methods for adding/retrieving and checking/waiting if empty.
1793      synchronized boolean isEmpty()
1794      {
1795        return(queue.isEmpty());
1796      }
1797
1798      synchronized void waitUntilEmpty()
1799      {
1800        while(!isEmpty())
1801        {
1802          try
1803          {
1804            wait();
1805          }
1806          catch(InterruptedException ie) {}
1807        }
1808      }
1809
1810      synchronized void addChunk(byte[] byteChunk)
1811      {
1812        queue.add(byteChunk);
1813      }
1814
1815      synchronized byte[] retrieveChunk()
1816      {
1817        byte[] removedChunk = (byte[]) queue.remove(0);
1818        notifyAll(); // to wake anything waiting until empty.
1819        return(removedChunk);
1820      }
1821    }
1822
1823    // class to implement thread which will copy and flush byte chunks
1824    private class CopierThread extends Thread
1825    {
1826      private boolean active = true;
1827
1828      public CopierThread() {
1829        super();
1830        setDaemon(true);
1831      }
1832
1833      // wait until either terminate or wake is called by another thread
1834      synchronized void waitWoken()
1835      {
1836        try
1837        {
1838          this.wait(1000); // timeout of one second
1839          // NOTE a timeout is needed because the notify may happen while the
1840          // copier thread is not in the wait state.
1841        }
1842        catch(InterruptedException ie)
1843        {}
1844      }
1845
1846      synchronized void terminate()
1847      {
1848        active = false;
1849        notifyAll();
1850      }
1851
1852      synchronized void wake()
1853      {
1854        notifyAll();
1855      }
1856
1857      // Main method run when thread is started.
1858      public void run()
1859      {
1860        // Repeat this loop until terminated
1861        while(active)
1862        {
1863          // if the byteChunkQueue still has chunks, copy and flush them
1864         if(!byteChunkQueue.isEmpty())
1865         {
1866            copyAndFlush();
1867          }
1868          // if it is empty, wait until woken by terminate or wake
1869	 else
1870          {
1871            this.waitWoken();
1872          }
1873        }
1874      }
1875    }
1876  }
1877
1878
1879
1880}
1881