• Home
  • History
  • Annotate
  • Line#
  • Navigate
  • Raw
  • Download
  • only in /barrelfish-2018-10-04/usr/eclipseclp/JavaInterface/src/com/parctechnologies/eclipse/
1// BEGIN LICENSE BLOCK
2// Version: CMPL 1.1
3//
4// The contents of this file are subject to the Cisco-style Mozilla Public
5// License Version 1.1 (the "License"); you may not use this file except
6// in compliance with the License.  You may obtain a copy of the License
7// at www.eclipse-clp.org/license.
8//
9// Software distributed under the License is distributed on an "AS IS"
10// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied.  See
11// the License for the specific language governing rights and limitations
12// under the License.
13//
14// The Original Code is  The ECLiPSe Constraint Logic Programming System.
15// The Initial Developer of the Original Code is  Cisco Systems, Inc.
16// Portions created by the Initial Developer are
17// Copyright (C) 2000 - 2006 Cisco Systems, Inc.  All Rights Reserved.
18//
19// Contributor(s): Stefano Novello / Josh Singer, Parc Technologies
20//
21// END LICENSE BLOCK
22
23//Title:        Java/ECLiPSe interface
24//Version:      $Id: EclipseConnectionImpl.java,v 1.1 2006/09/23 01:54:09 snovello Exp $
25//Author:       Josh Singer / Stefano Novello
26//Company:      Parc Technologies
27//Description:  Abstract class providing common code for classes which implement the EclipseConnection interface.
28package com.parctechnologies.eclipse;
29import java.io.*;
30import java.util.*;
31import java.net.Socket;
32
33/**
34 * Abstract superclass of classes which implement the EclipseConnection interface.
35 *
36 * @see EmbeddedEclipse
37 * @see RemoteEclipse
38 */
39public abstract class EclipseConnectionImpl implements EclipseConnection
40{
41
42  /**
43   * Flag to indicate whether the ECLiPSe engine, or the connection to it,
44   * has been terminated.
45   */
46  boolean terminated = false;
47
48  /**
49   * Maps ECLiPSe stream numbers (Integers) to the corresponding FromEclipseQueue.
50   * Every FromEclipseQueue that is created for this EclipseConnectionImpl
51   * is registered in this map with its
52   * stream number. When it is closed, it is removed from the map.
53   */
54  private Map fromEclipseQueueRegister = new HashMap();
55
56  /**
57   * Maps ECLiPSe stream numbers (Integers) to the corresponding ToEclipseQueue.
58   * Every ToEclipseQueue that is created is registered in this map with its
59   * stream number. When it is closed, it is removed from the map.
60   */
61  private Map toEclipseQueueRegister = new HashMap();
62
63  /**
64   * Maps ECLiPSe stream numbers (Integers) to the corresponding AsyncEclipseQueue.
65   * Every AsyncEclipseQueue that is created is registered in this map with its
66   * stream number. When it is closed, it is removed from the map.
67   */
68  private Map asyncEclipseQueueRegister = new HashMap();
69
70  /**
71   * Stream used to send RPC goals to Eclipse. This must be initialised by the
72   * concrete subclass -- no initialisation is performed here.
73   */
74  EXDROutputStream toEclipse;
75
76  /**
77   * Stream used to receive RPC goals from Eclipse. This must be initialised by the
78   * concrete subclass -- no initialisation is performed here.
79   */
80  EXDRInputStream fromEclipse;
81
82  /**
83   * The singleton EclipseMultitaskConnection returned/created by the
84   * registerMultitask method.
85   */
86  EclipseMultitaskConnection eclipseMultitaskConnection;
87
88  /**
89   * Peer name by which the Java side of the Eclipse connection is known in
90   * ECLiPSe.
91   */
92  private Atom _peerName;
93
94  // implements method required in EclipseConnection interface.
95  public Atom getPeerName()
96  {
97    return(_peerName);
98  }
99
100  // set the peer name in this object. Should be called by subclass during
101  // initialisation
102  void setPeerName(Atom peerName)
103  {
104    _peerName = peerName;
105  }
106
107  /**
108   * Test whether this EclipseConnectionImpl has been terminated, and if so,
109   * throw an EclipseTerminatedException. This should be called at the beginning
110   * of the implementations of the public methods of EclipseConnection.
111   */
112  void testTerminated() throws EclipseTerminatedException
113  {
114    if(terminated)
115      throw new EclipseTerminatedException();
116  }
117
118  /**
119   * Invoke the <code>close()</code> method on all registered user queues (i.e.
120   * not system queues, such as ec_rpc_out in the embedded case).
121   *
122   * @param ec_side determines whether the eclipse side is closed as well as
123   * the java side.
124   */
125  void closeAllQueues(boolean ec_side) throws IOException
126  {
127    // Invoke the close() method on all registered FromEclipseQueues
128    closeAllFromEclipseQueues(ec_side);
129    // Invoke the close() method on all registered ToEclipseQueues
130    closeAllToEclipseQueues(ec_side);
131    closeAllAsyncEclipseQueues(ec_side);
132  }
133
134  /**
135   * Close all registered FromEclipseQueues (user queues only).
136   * @param ec_side determines whether the eclipse side is closed as well as
137   * the java side.
138   */
139  private void closeAllFromEclipseQueues(boolean ec_side) throws IOException
140  {
141    // We get the collection of queues by taking a copy of the value set
142    // of the register. We take a copy because the close() method will
143    // alter this set while we are iterating over it.
144    Collection fromEclipseQueues = new LinkedList(fromEclipseQueueRegister.values());
145    Iterator i = fromEclipseQueues.iterator();
146    FromEclipseQueue feq;
147
148    while(i.hasNext())
149    {
150      feq = (FromEclipseQueue) i.next();
151      if(!feq.isSystemQueue())
152      {
153        feq.close_cleanup();
154        this.closeFromEclipseStreamJavaSide(feq.getID());
155        if(ec_side)
156        {
157          this.closeFromEclipseStreamEclipseSide(feq.getID());
158        }
159      }
160    }
161  }
162
163  /**
164   * Close all registered ToEclipseQueues (user queues only).
165   * @param ec_side determines whether the eclipse side is closed as well as
166   * the java side.
167   */
168  private void closeAllToEclipseQueues(boolean ec_side) throws IOException
169  {
170    // We get the collection of queues by taking a copy of the value set
171    // of the register. We take a copy because the close() method will
172    // alter this set while we are iterating over it.
173    Collection toEclipseQueues = new LinkedList(toEclipseQueueRegister.values());
174    Iterator i = toEclipseQueues.iterator();
175    ToEclipseQueue teq;
176
177    while(i.hasNext())
178    {
179      teq = (ToEclipseQueue) i.next();
180      if(!teq.isSystemQueue())
181      {
182        this.closeToEclipseStreamJavaSide(teq.getID());
183        if(ec_side)
184        {
185          this.closeToEclipseStreamEclipseSide(teq.getID());
186        }
187      }
188    }
189  }
190
191  /**
192   * Close all registered AsyncEclipseQueues (user queues only).
193   * @param ec_side determines whether the eclipse side is closed as well as
194   * the java side.
195   */
196  private void closeAllAsyncEclipseQueues(boolean ec_side) throws IOException
197  {
198    // We get the collection of queues by taking a copy of the value set
199    // of the register. We take a copy because the close() method will
200    // alter this set while we are iterating over it.
201    Collection asyncEclipseQueues = new LinkedList(asyncEclipseQueueRegister.values());
202    Iterator i = asyncEclipseQueues.iterator();
203    AsyncEclipseQueue aeq;
204
205    while(i.hasNext())
206    {
207      aeq = (AsyncEclipseQueue) i.next();
208      if(!aeq.isSystemQueue())
209      {
210        this.closeAsyncEclipseStreamJavaSide(aeq.getID());
211        if(ec_side)
212        {
213          this.closeAsyncEclipseStreamEclipseSide(aeq.getID());
214        }
215      }
216    }
217  }
218
219  /**
220   * Get an already registered FromEclipseQueue of this ECLiPSe given its
221   * ECLiPSe stream number.
222   */
223  FromEclipseQueue lookupFromEclipseQueue(int id)
224  {
225    return (FromEclipseQueue) fromEclipseQueueRegister.get(new Integer(id));
226  }
227
228  /**
229   * Get an already registered ToEclipseQueue of this ECLiPSe given its
230   * ECLiPSe stream number.
231   */
232  ToEclipseQueue lookupToEclipseQueue(int id)
233  {
234    return (ToEclipseQueue) toEclipseQueueRegister.get(new Integer(id));
235  }
236
237  /**
238   * Get an already registered AsyncEclipseQueue of this ECLiPSe given its
239   * ECLiPSe stream number.
240   */
241  AsyncEclipseQueue lookupAsyncEclipseQueue(int id)
242  {
243    return (AsyncEclipseQueue) asyncEclipseQueueRegister.get(new Integer(id));
244  }
245
246  /**
247   * Register a new FromEclipseQueue with its stream number.
248   */
249  void registerFromEclipseQueue(int id, FromEclipseQueue inputQueue) throws EclipseTerminatedException
250  {
251    fromEclipseQueueRegister.put(new Integer(id), inputQueue);
252  }
253
254  /**
255   * Register a new ToEclipseQueue with its stream number.
256   */
257  void registerToEclipseQueue(int id, ToEclipseQueue outputQueue) throws EclipseTerminatedException
258  {
259    toEclipseQueueRegister.put(new Integer(id), outputQueue);
260  }
261
262  /**
263   * Register a new AsyncEclipseQueue with its stream number.
264   */
265  void registerAsyncEclipseQueue(int id, AsyncEclipseQueue queue) throws EclipseTerminatedException
266  {
267    asyncEclipseQueueRegister.put(new Integer(id), queue);
268  }
269
270  /**
271   * Unregister a FromEclipseQueue given its stream number.
272   */
273  void unregisterFromEclipseQueue(int id)
274  {
275    fromEclipseQueueRegister.remove(new Integer(id));
276  }
277
278  /**
279   * Unregister an ToEclipseQueue given its stream number.
280   */
281  void unregisterToEclipseQueue(int id)
282  {
283    toEclipseQueueRegister.remove(new Integer(id));
284  }
285
286  /**
287   * Unregister an AsyncEclipseQueue given its stream number.
288   */
289  void unregisterAsyncEclipseQueue(int id)
290  {
291    asyncEclipseQueueRegister.remove(new Integer(id));
292  }
293
294  // Implementation of public method from EclipseConnection interface
295  public synchronized void compile(File f) throws EclipseException, IOException
296  {
297    rpc(new CompoundTermImpl("compile" , getPath(f)));
298  }
299
300  // Implementation of public method from EclipseConnection interface
301  public String getPath(File f) throws EclipseException, IOException
302  {
303    CompoundTerm call = new CompoundTermImpl("os_file_name" , null , f.getAbsolutePath() );
304    return (String) rpc(call).arg(1);
305  }
306
307  // Implementation of public method from EclipseConnection interface
308  public synchronized CompoundTerm rpc(String goal) throws EclipseException,
309    IOException
310  {
311    testTerminated();
312    return executeRpc(goal);
313  }
314
315  // Implementation of public method from EclipseConnection interface
316  public CompoundTerm rpc(String functor, Object arg1) throws EclipseException, IOException
317  {
318    return(rpc(new CompoundTermImpl(functor, arg1)));
319  }
320
321  // Implementation of public method from EclipseConnection interface
322  public CompoundTerm rpc(String functor, Object arg1,
323                          Object arg2) throws EclipseException, IOException
324  {
325    return(rpc(new CompoundTermImpl(functor, arg1, arg2)));
326  }
327
328  // Implementation of public method from EclipseConnection interface
329  public CompoundTerm rpc(String functor, Object arg1,
330                          Object arg2, Object arg3) throws EclipseException, IOException
331  {
332    return(rpc(new CompoundTermImpl(functor, arg1, arg2, arg3)));
333  }
334
335  // Implementation of public method from EclipseConnection interface
336  public CompoundTerm rpc(String functor, Object arg1,
337                          Object arg2, Object arg3, Object arg4) throws EclipseException, IOException
338  {
339    return(rpc(new CompoundTermImpl(functor, arg1, arg2, arg3, arg4)));
340  }
341
342  // Implementation of public method from EclipseConnection interface
343  public CompoundTerm rpc(String functor, Object arg1,
344                          Object arg2, Object arg3, Object arg4,
345                          Object arg5) throws EclipseException, IOException
346  {
347    return(rpc(new CompoundTermImpl(functor, arg1, arg2, arg3, arg4, arg5)));
348  }
349
350  // Implementation of public method from EclipseConnection interface
351  public CompoundTerm rpc(String functor, Object[] args) throws EclipseException, IOException
352  {
353    return(rpc(new CompoundTermImpl(functor, args)));
354  }
355
356  // Implementation of public method from EclipseConnection interface
357  public CompoundTerm rpc(Object[] goalTerm) throws EclipseException, IOException
358  {
359    return(rpc(new CompoundTermImpl(goalTerm)));
360  }
361
362  // Implementation of public method from EclipseConnection interface
363  public synchronized CompoundTerm rpc(CompoundTerm goal) throws EclipseException, IOException
364  {
365    testTerminated();
366    return executeRpc(goal);
367  }
368
369  /**
370   * Common implementation for rpc, for both Strings and CompoundTerms. Relies
371   * on three abstract methods to be implemented by subclasses: sendGoal(Object),
372   * waitForEclipse() and receiveGoal().
373   */
374  private CompoundTerm executeRpc(Object goal) throws EclipseException, IOException
375  {
376    // send the goal object to ECLiPSe
377    sendGoal(goal);
378    // pass control to ECLiPSe and handle any events it generates when it
379    // returns control. Keep doing this until it reports that it has finished
380    // executing the rpc goal.
381    waitForEclipse(false);
382    // receive the goal term from ECLiPSe
383    CompoundTerm answer = (CompoundTerm) receiveGoal();
384
385    // if the returned term is the atom fail
386    if(answer.functor().equals("fail") &&
387       answer.arity() == 0)
388    {
389      // throw the appropriate fail exception
390      throw new Fail(goal);
391    }
392    // similarly for throw
393    if(answer.functor().equals("throw") &&
394       answer.arity() == 0)
395    {
396      throw new Throw(goal);
397    }
398    // otherwise return the returned goal term.
399    return answer;
400  }
401
402  // Implementation of public method from EclipseConnection interface
403  public synchronized FromEclipseQueue getFromEclipseQueue(String name)
404    throws EclipseException, IOException
405  {
406    // throw exception if terminated
407    testTerminated();
408
409    // try to get the numeric id of the stream in eclipse (returns
410    // negative if the stream name is not valid)
411    int id = getStreamNumber(name);
412
413    // if id is non-negative then see if it is registered
414    if(id >= 0)
415    {
416      FromEclipseQueue feq = lookupFromEclipseQueue(id);
417
418      // If so, return it.
419      if (feq != null)
420      {
421        return(feq);
422      }
423    }
424
425    try
426    {
427      // see if there is already a stream with the above name on the eclipse side
428      rpc("current_stream", new Atom(name));
429      // if the above goal succeeds, throw an exception
430      throw(new EclipseException("Cannot create FromEclipseQueue: stream name in use."));
431    }
432    catch(Fail e) // if the above goal fails
433    {
434      // additional setup routine
435      // this is implemented by an abstract method as it will vary according to
436      // subclass
437      setupFromEclipseQueue(name);
438      // create, register and return the queue based on this
439      return(createFromEclipseQueue(name));
440
441    }
442  }
443
444  // Implementation of public method from EclipseConnection interface
445  public synchronized ToEclipseQueue getToEclipseQueue(String name)
446    throws EclipseException, IOException
447  {
448    // throw exception if terminated
449    testTerminated();
450
451    // try to get the numeric id of the stream in eclipse (returns
452    // negative if the stream name is not valid)
453    int id = getStreamNumber(name);
454
455    // if id is non-negative then see if it is registered
456    if(id >= 0)
457    {
458      ToEclipseQueue teq = lookupToEclipseQueue(id);
459
460      // If it is, return it
461      if (teq != null)
462      {
463        return teq;
464      }
465    }
466
467    try
468    {
469      // see if there is already a stream with the above name
470      rpc("current_stream", new Atom(name));
471      // if the above goal succeeds, throw an exception
472      throw(new EclipseException("Cannot create ToEclipseQueue: stream name in use."));
473    }
474    catch(Fail e) // if the above goal fails
475    {
476      // additional setup routine
477      // this is implemented by an abstract method as it will vary according to
478      // subclass
479      setupToEclipseQueue(name);
480      // create, register and return the queue based on this
481      return(createToEclipseQueue(name));
482    }
483  }
484
485
486  // Implementation of public method from EclipseConnection interface
487  public synchronized AsyncEclipseQueue getAsyncEclipseQueue(String name)
488    throws EclipseException, IOException
489  {
490    // throw exception if terminated
491    testTerminated();
492
493    // try to get the numeric id of the stream in eclipse (returns
494    // negative if the stream name is not valid)
495    int id = getStreamNumber(name);
496
497    // if id is non-negative then see if it is registered
498    if(id >= 0)
499    {
500      AsyncEclipseQueue aeq = lookupAsyncEclipseQueue(id);
501
502      // If it is, return it
503      if (aeq != null)
504      {
505        return aeq;
506      }
507    }
508
509    try
510    {
511      // see if there is already a stream with the above name
512      rpc("current_stream", new Atom(name));
513      // if the above goal succeeds, throw an exception
514      throw(new EclipseException("Cannot create AsyncEclipseQueue: stream name in use."));
515    }
516    catch(Fail e) // if the above goal fails
517    {
518      // additional setup routine
519      // this is implemented by an abstract method as it will vary according to
520      // subclass
521      setupAsyncEclipseQueue(name);
522      // create, register and return the queue based on this
523      return(createAsyncEclipseQueue(name));
524    }
525  }
526
527  /**
528   * Creates, registers and returns a new FromEclipseQueue object. Assumes that
529   * there is no FromEclipseQueue registered in this Eclipse with the same name.
530   * Also assumes that the Eclipse side of the queue has been set up.
531   */
532  FromEclipseQueue createFromEclipseQueue(String name) throws IOException
533  {
534    int id = getStreamNumber(name);
535    FromEclipseQueue inQ = new FromEclipseQueue(id,name,this);
536    registerFromEclipseQueue(id,inQ);
537    return inQ;
538  }
539
540  /**
541   * Creates, registers and returns a new ToEclipseQueue object. Assumes that
542   * there is no ToEclipseQueue registered in this Eclipse with the same name.
543   * Also assumes that the Eclipse side of the queue has been set up.
544   */
545  ToEclipseQueue createToEclipseQueue(String name) throws IOException
546  {
547    int id = getStreamNumber(name);
548    ToEclipseQueue outQ = new ToEclipseQueue(id,name,this);
549    registerToEclipseQueue(id,outQ);
550    return outQ;
551  }
552
553  /**
554   * Creates, registers and returns a new AsyncEclipseQueue object. Assumes that
555   * there is no AsyncEclipseQueue registered in this Eclipse with the same name.
556   * Also assumes that the Eclipse side of the queue has been set up.
557   */
558  AsyncEclipseQueue createAsyncEclipseQueue(String name) throws IOException
559  {
560    int id = getStreamNumber(name);
561    AsyncEclipseQueue q = new AsyncEclipseQueue(id,name,this);
562    registerAsyncEclipseQueue(id,q);
563    return q;
564  }
565
566  InputStream getAsyncInputStream(int id) throws IOException
567  {
568    throw new IOException("Asynchronous queues not implemented for this connection type");
569  }
570
571  OutputStream getAsyncOutputStream(int id) throws IOException
572  {
573    throw new IOException("Asynchronous queues not implemented for this connection type");
574  }
575
576  /**
577   * Send an RPC goal to ECLiPSe.
578   */
579  abstract void sendGoal(Object goal) throws IOException;
580
581  /**
582   * Receive an RPC goal from ECLiPSe.
583   */
584  abstract Object receiveGoal() throws IOException;
585
586
587  /**
588   * Look up the stream number of an existing stream within this ECLiPSe,
589   * given its name. Returns negative if there is no stream with that name.
590   */
591  int getStreamNumber(String streamName) throws IOException
592  {
593    try
594    {
595      rpc("current_stream", new Atom(streamName));
596      CompoundTerm result =
597        rpc("get_stream_info", new Atom(streamName), new Atom("physical_stream"), null);
598      Integer stream_number = (Integer) result.arg(3);
599      return(stream_number.intValue());
600    }
601    catch(EclipseException f)
602    {
603      // if there is no stream with the supplied name, return -1
604      return(-1);
605    }
606
607  }
608
609
610  /**
611   * Abstract methods, must be implemented by subclasses. These are used to
612   * supply the subclass-specific implementations of certain operations used by
613   * methods in both this class and other classes in the package.
614   */
615
616  /**
617   * Perform any additional setup required to initialise a
618   * FromEclipseQueue
619   */
620  abstract void setupFromEclipseQueue(String name)
621    throws EclipseException, IOException;
622
623  /**
624   * Perform any additional setup required to initialise a
625   * ToEclipseQueue
626   */
627  abstract void setupToEclipseQueue(String name)
628    throws EclipseException, IOException;
629
630  /**
631   * Perform any additional setup required to initialise a
632   * AsyncEclipseQueue
633   */
634  abstract void setupAsyncEclipseQueue(String name)
635    throws EclipseException, IOException;
636
637  /**
638   * Keep resuming ECLiPSe and handling any control signals it generates
639   * until a yield signal occurs.
640   */
641  void waitForEclipse(boolean transferControlWithResume) throws IOException
642  {
643    boolean isFirstIteration = true;
644    ControlSignal nextControlSignal;
645    do
646    {
647      nextControlSignal =
648        getNextControlSignal(isFirstIteration, transferControlWithResume);
649      if(nextControlSignal == null)
650        throw new IOException("Unrecognised ECLiPSe control signal.");
651      isFirstIteration = false;
652      nextControlSignal.respond();
653    }
654    while(!(nextControlSignal instanceof YieldSignal));
655  }
656
657  abstract ControlSignal getNextControlSignal(boolean isFirstIteration,
658                                              boolean transferControlWithResume)
659    throws IOException;
660
661
662  abstract class ControlSignal
663  {
664    abstract void respond() throws IOException;
665  }
666
667  class YieldSignal extends ControlSignal
668  {
669    void respond() throws IOException
670    {
671      respondYield();
672    }
673  }
674
675  class WaitIOSignal extends ControlSignal
676  {
677    private Integer streamID;
678    WaitIOSignal(Integer streamID)
679    {
680      this.streamID = streamID;
681    }
682    void respond() throws IOException
683    {
684      respondWaitIO(streamID);
685    }
686  }
687
688  class OpenQueueSignal extends ControlSignal
689  {
690    private Atom nameAtom;
691    private Integer streamID;
692    private Atom direction;
693    OpenQueueSignal(Atom nameAtom, Integer streamID, Atom direction)
694    {
695      this.nameAtom = nameAtom;
696      this.streamID = streamID;
697      this.direction = direction;
698    }
699    void respond() throws IOException
700    {
701      respondOpenQueue(nameAtom, streamID, direction);
702    }
703  }
704
705  void respondYield() throws IOException
706  {
707  }
708
709  void respondWaitIO(Integer streamID) throws IOException
710  {
711    // look up the queue in the toEclipseQueue register
712    ToEclipseQueue teq = lookupToEclipseQueue(streamID.intValue());
713    // if it is not there, print a message to stderr
714    if (teq == null) {
715      System.err.println("ECLiPSe yielded after reading empty stream "+
716      streamID.intValue() +
717        " which is not registered as a ToEclipseQueue.");
718    } else {
719      // otherwise notify the queue's listener of a request for data
720      // (causes its dataRequest method to be invoked)
721      teq.notifyRequest();
722    }
723  }
724
725  void respondCloseQueue(Integer streamID) throws IOException
726  {
727    FromEclipseQueue feq = null;
728    ToEclipseQueue teq = null;
729    AsyncEclipseQueue aeq = null;
730    teq = lookupToEclipseQueue(streamID.intValue());
731    if(teq != null)
732    {
733      teq.close_cleanup();
734      closeToEclipseStreamJavaSide(streamID.intValue());
735      return;
736    }
737    feq = lookupFromEclipseQueue(streamID.intValue());
738    if(feq != null)
739    {
740      feq.close_cleanup();
741      closeFromEclipseStreamJavaSide(streamID.intValue());
742      return;
743    }
744    aeq = lookupAsyncEclipseQueue(streamID.intValue());
745    if(aeq != null)
746    {
747      aeq.close_cleanup();
748      closeAsyncEclipseStreamJavaSide(streamID.intValue());
749      return;
750    }
751    System.err.println("Cannot close "+streamID+": not the "+
752      "stream number of a registered ECLiPSe queue.");
753  }
754
755
756  void respondOpenQueue(Atom nameAtom, Integer streamID, Atom direction)
757    throws IOException
758  {
759    if(direction.functor().equals("fromec"))
760    {
761      createFromEclipseQueue(nameAtom.functor());
762    }
763    else if(direction.functor().equals("toec"))
764    {
765      createToEclipseQueue(nameAtom.functor());
766    }
767    else if(direction.functor().equals("bidirect"))
768    {
769      createAsyncEclipseQueue(nameAtom.functor());
770    }
771  }
772
773
774
775
776
777
778  /**
779   * Read <code>len</code> bytes from this ECLiPSe's stream number
780   * <code>streamid</code> and store them in
781   * byte array <code>b</code> at offset <code>off</code>.
782   *
783   * @returns the number of bytes read.
784   */
785  abstract int readFromStream(int streamid, int off, int len, byte[] b)
786    throws IOException;
787
788  /**
789   * Read a single byte from this ECLiPSe's stream number
790   * <code>streamid</code>
791   *
792   * @returns byte read, an int between 0 and 255 or -1 if 0 bytes were read.
793   */
794  abstract int readByteFromStream(int streamid) throws IOException;
795
796  /**
797   * Returns the number of bytes available on stream streamid which may be
798   * read or skipped over without blocking.
799   */
800  abstract int availableOnStream(int streamid) throws IOException;
801
802
803  /**
804   * Write <code>len</code> bytes to this ECLiPSe's stream number
805   * <code>streamid</code> at offset <code>off</code> from
806   * byte array <code>b</code>.
807   *
808   * @returns the number of bytes written.
809   */
810  abstract int writeToStream(int streamid, byte[] b, int off, int len)
811    throws IOException;
812
813  /**
814   * Write a single byte to this ECLiPSe's stream number
815   * <code>streamid</code>.
816   *
817   */
818  abstract void writeByteToStream(int streamid, byte b)
819    throws IOException;
820
821  /**
822   * Flush this ECLiPSe's stream number <code>streamid</code>.
823   *
824   */
825  abstract void flushStream(int streamid) throws IOException;
826
827  // when a ToEclipseStream is closed, unregister it. Subclasses may
828  // over-ride this method and perform additional actions, but they should call
829  // it first using super.
830  void closeToEclipseStreamJavaSide(int streamid) throws IOException
831  {
832    ToEclipseQueue teq = lookupToEclipseQueue(streamid);
833    unregisterToEclipseQueue(streamid);
834  }
835
836  // when a AsyncEclipseStream is closed, unregister it. Subclasses may
837  // over-ride this method and perform additional actions, but they should call
838  // it first using super.
839  void closeAsyncEclipseStreamJavaSide(int streamid) throws IOException
840  {
841    unregisterAsyncEclipseQueue(streamid);
842  }
843
844  // when a FromEclipseStream is closed, unregister it. Subclasses may
845  // over-ride this method and perform additional actions, but they should call
846  // it first using super.
847  void closeFromEclipseStreamJavaSide(int streamid) throws IOException
848  {
849    FromEclipseQueue feq = lookupFromEclipseQueue(streamid);
850    unregisterFromEclipseQueue(streamid);
851  }
852
853  // Common code to close the eclipse side of a From/ToEclipse queue. Subclasses
854  // will override these methods to perform additional actions, but they should
855  // invoke them first using super.
856  void closeFromEclipseStreamEclipseSide(int streamid) throws IOException
857  {
858  }
859
860  void closeToEclipseStreamEclipseSide(int streamid) throws IOException
861  {
862  }
863
864  void closeAsyncEclipseStreamEclipseSide(int streamid) throws IOException
865  {
866  }
867
868  synchronized EclipseMultitaskConnection getEclipseMultitaskConnection() throws EclipseException, IOException {
869    if ( eclipseMultitaskConnection == null ) {
870      CompoundTermImpl resultGoal =
871        (CompoundTermImpl)
872        rpc("peer_register_multitask",
873            getPeerName(),
874            null);
875      String fromStream = ((Atom)resultGoal.arg(2)).functor();
876      FromEclipseQueue queue = getFromEclipseQueue(fromStream);
877      eclipseMultitaskConnection =
878        new EclipseMultitaskConnectionImpl(this, queue);
879    }
880    return eclipseMultitaskConnection;
881  }
882
883  // implements method from EclipseConnection
884  public EclipseMultitaskConnection registerMultitask(MultitaskListener multitaskListener) throws EclipseException,IOException {
885    testTerminated();
886    EclipseMultitaskConnection emc = getEclipseMultitaskConnection();
887    return emc.registerMultitask(multitaskListener);
888  }
889}
890