1// BEGIN LICENSE BLOCK 2// Version: CMPL 1.1 3// 4// The contents of this file are subject to the Cisco-style Mozilla Public 5// License Version 1.1 (the "License"); you may not use this file except 6// in compliance with the License. You may obtain a copy of the License 7// at www.eclipse-clp.org/license. 8// 9// Software distributed under the License is distributed on an "AS IS" 10// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See 11// the License for the specific language governing rights and limitations 12// under the License. 13// 14// The Original Code is The ECLiPSe Constraint Logic Programming System. 15// The Initial Developer of the Original Code is Cisco Systems, Inc. 16// Portions created by the Initial Developer are 17// Copyright (C) 2000 - 2006 Cisco Systems, Inc. All Rights Reserved. 18// 19// Contributor(s): Stefano Novello / Josh Singer, Parc Technologies 20// 21// END LICENSE BLOCK 22 23//Title: Java/ECLiPSe interface 24//Version: $Id: EclipseConnectionImpl.java,v 1.1 2006/09/23 01:54:09 snovello Exp $ 25//Author: Josh Singer / Stefano Novello 26//Company: Parc Technologies 27//Description: Abstract class providing common code for classes which implement the EclipseConnection interface. 28package com.parctechnologies.eclipse; 29import java.io.*; 30import java.util.*; 31import java.net.Socket; 32 33/** 34 * Abstract superclass of classes which implement the EclipseConnection interface. 35 * 36 * @see EmbeddedEclipse 37 * @see RemoteEclipse 38 */ 39public abstract class EclipseConnectionImpl implements EclipseConnection 40{ 41 42 /** 43 * Flag to indicate whether the ECLiPSe engine, or the connection to it, 44 * has been terminated. 45 */ 46 boolean terminated = false; 47 48 /** 49 * Maps ECLiPSe stream numbers (Integers) to the corresponding FromEclipseQueue. 50 * Every FromEclipseQueue that is created for this EclipseConnectionImpl 51 * is registered in this map with its 52 * stream number. When it is closed, it is removed from the map. 53 */ 54 private Map fromEclipseQueueRegister = new HashMap(); 55 56 /** 57 * Maps ECLiPSe stream numbers (Integers) to the corresponding ToEclipseQueue. 58 * Every ToEclipseQueue that is created is registered in this map with its 59 * stream number. When it is closed, it is removed from the map. 60 */ 61 private Map toEclipseQueueRegister = new HashMap(); 62 63 /** 64 * Maps ECLiPSe stream numbers (Integers) to the corresponding AsyncEclipseQueue. 65 * Every AsyncEclipseQueue that is created is registered in this map with its 66 * stream number. When it is closed, it is removed from the map. 67 */ 68 private Map asyncEclipseQueueRegister = new HashMap(); 69 70 /** 71 * Stream used to send RPC goals to Eclipse. This must be initialised by the 72 * concrete subclass -- no initialisation is performed here. 73 */ 74 EXDROutputStream toEclipse; 75 76 /** 77 * Stream used to receive RPC goals from Eclipse. This must be initialised by the 78 * concrete subclass -- no initialisation is performed here. 79 */ 80 EXDRInputStream fromEclipse; 81 82 /** 83 * The singleton EclipseMultitaskConnection returned/created by the 84 * registerMultitask method. 85 */ 86 EclipseMultitaskConnection eclipseMultitaskConnection; 87 88 /** 89 * Peer name by which the Java side of the Eclipse connection is known in 90 * ECLiPSe. 91 */ 92 private Atom _peerName; 93 94 // implements method required in EclipseConnection interface. 95 public Atom getPeerName() 96 { 97 return(_peerName); 98 } 99 100 // set the peer name in this object. Should be called by subclass during 101 // initialisation 102 void setPeerName(Atom peerName) 103 { 104 _peerName = peerName; 105 } 106 107 /** 108 * Test whether this EclipseConnectionImpl has been terminated, and if so, 109 * throw an EclipseTerminatedException. This should be called at the beginning 110 * of the implementations of the public methods of EclipseConnection. 111 */ 112 void testTerminated() throws EclipseTerminatedException 113 { 114 if(terminated) 115 throw new EclipseTerminatedException(); 116 } 117 118 /** 119 * Invoke the <code>close()</code> method on all registered user queues (i.e. 120 * not system queues, such as ec_rpc_out in the embedded case). 121 * 122 * @param ec_side determines whether the eclipse side is closed as well as 123 * the java side. 124 */ 125 void closeAllQueues(boolean ec_side) throws IOException 126 { 127 // Invoke the close() method on all registered FromEclipseQueues 128 closeAllFromEclipseQueues(ec_side); 129 // Invoke the close() method on all registered ToEclipseQueues 130 closeAllToEclipseQueues(ec_side); 131 closeAllAsyncEclipseQueues(ec_side); 132 } 133 134 /** 135 * Close all registered FromEclipseQueues (user queues only). 136 * @param ec_side determines whether the eclipse side is closed as well as 137 * the java side. 138 */ 139 private void closeAllFromEclipseQueues(boolean ec_side) throws IOException 140 { 141 // We get the collection of queues by taking a copy of the value set 142 // of the register. We take a copy because the close() method will 143 // alter this set while we are iterating over it. 144 Collection fromEclipseQueues = new LinkedList(fromEclipseQueueRegister.values()); 145 Iterator i = fromEclipseQueues.iterator(); 146 FromEclipseQueue feq; 147 148 while(i.hasNext()) 149 { 150 feq = (FromEclipseQueue) i.next(); 151 if(!feq.isSystemQueue()) 152 { 153 feq.close_cleanup(); 154 this.closeFromEclipseStreamJavaSide(feq.getID()); 155 if(ec_side) 156 { 157 this.closeFromEclipseStreamEclipseSide(feq.getID()); 158 } 159 } 160 } 161 } 162 163 /** 164 * Close all registered ToEclipseQueues (user queues only). 165 * @param ec_side determines whether the eclipse side is closed as well as 166 * the java side. 167 */ 168 private void closeAllToEclipseQueues(boolean ec_side) throws IOException 169 { 170 // We get the collection of queues by taking a copy of the value set 171 // of the register. We take a copy because the close() method will 172 // alter this set while we are iterating over it. 173 Collection toEclipseQueues = new LinkedList(toEclipseQueueRegister.values()); 174 Iterator i = toEclipseQueues.iterator(); 175 ToEclipseQueue teq; 176 177 while(i.hasNext()) 178 { 179 teq = (ToEclipseQueue) i.next(); 180 if(!teq.isSystemQueue()) 181 { 182 this.closeToEclipseStreamJavaSide(teq.getID()); 183 if(ec_side) 184 { 185 this.closeToEclipseStreamEclipseSide(teq.getID()); 186 } 187 } 188 } 189 } 190 191 /** 192 * Close all registered AsyncEclipseQueues (user queues only). 193 * @param ec_side determines whether the eclipse side is closed as well as 194 * the java side. 195 */ 196 private void closeAllAsyncEclipseQueues(boolean ec_side) throws IOException 197 { 198 // We get the collection of queues by taking a copy of the value set 199 // of the register. We take a copy because the close() method will 200 // alter this set while we are iterating over it. 201 Collection asyncEclipseQueues = new LinkedList(asyncEclipseQueueRegister.values()); 202 Iterator i = asyncEclipseQueues.iterator(); 203 AsyncEclipseQueue aeq; 204 205 while(i.hasNext()) 206 { 207 aeq = (AsyncEclipseQueue) i.next(); 208 if(!aeq.isSystemQueue()) 209 { 210 this.closeAsyncEclipseStreamJavaSide(aeq.getID()); 211 if(ec_side) 212 { 213 this.closeAsyncEclipseStreamEclipseSide(aeq.getID()); 214 } 215 } 216 } 217 } 218 219 /** 220 * Get an already registered FromEclipseQueue of this ECLiPSe given its 221 * ECLiPSe stream number. 222 */ 223 FromEclipseQueue lookupFromEclipseQueue(int id) 224 { 225 return (FromEclipseQueue) fromEclipseQueueRegister.get(new Integer(id)); 226 } 227 228 /** 229 * Get an already registered ToEclipseQueue of this ECLiPSe given its 230 * ECLiPSe stream number. 231 */ 232 ToEclipseQueue lookupToEclipseQueue(int id) 233 { 234 return (ToEclipseQueue) toEclipseQueueRegister.get(new Integer(id)); 235 } 236 237 /** 238 * Get an already registered AsyncEclipseQueue of this ECLiPSe given its 239 * ECLiPSe stream number. 240 */ 241 AsyncEclipseQueue lookupAsyncEclipseQueue(int id) 242 { 243 return (AsyncEclipseQueue) asyncEclipseQueueRegister.get(new Integer(id)); 244 } 245 246 /** 247 * Register a new FromEclipseQueue with its stream number. 248 */ 249 void registerFromEclipseQueue(int id, FromEclipseQueue inputQueue) throws EclipseTerminatedException 250 { 251 fromEclipseQueueRegister.put(new Integer(id), inputQueue); 252 } 253 254 /** 255 * Register a new ToEclipseQueue with its stream number. 256 */ 257 void registerToEclipseQueue(int id, ToEclipseQueue outputQueue) throws EclipseTerminatedException 258 { 259 toEclipseQueueRegister.put(new Integer(id), outputQueue); 260 } 261 262 /** 263 * Register a new AsyncEclipseQueue with its stream number. 264 */ 265 void registerAsyncEclipseQueue(int id, AsyncEclipseQueue queue) throws EclipseTerminatedException 266 { 267 asyncEclipseQueueRegister.put(new Integer(id), queue); 268 } 269 270 /** 271 * Unregister a FromEclipseQueue given its stream number. 272 */ 273 void unregisterFromEclipseQueue(int id) 274 { 275 fromEclipseQueueRegister.remove(new Integer(id)); 276 } 277 278 /** 279 * Unregister an ToEclipseQueue given its stream number. 280 */ 281 void unregisterToEclipseQueue(int id) 282 { 283 toEclipseQueueRegister.remove(new Integer(id)); 284 } 285 286 /** 287 * Unregister an AsyncEclipseQueue given its stream number. 288 */ 289 void unregisterAsyncEclipseQueue(int id) 290 { 291 asyncEclipseQueueRegister.remove(new Integer(id)); 292 } 293 294 // Implementation of public method from EclipseConnection interface 295 public synchronized void compile(File f) throws EclipseException, IOException 296 { 297 rpc(new CompoundTermImpl("compile" , getPath(f))); 298 } 299 300 // Implementation of public method from EclipseConnection interface 301 public String getPath(File f) throws EclipseException, IOException 302 { 303 CompoundTerm call = new CompoundTermImpl("os_file_name" , null , f.getAbsolutePath() ); 304 return (String) rpc(call).arg(1); 305 } 306 307 // Implementation of public method from EclipseConnection interface 308 public synchronized CompoundTerm rpc(String goal) throws EclipseException, 309 IOException 310 { 311 testTerminated(); 312 return executeRpc(goal); 313 } 314 315 // Implementation of public method from EclipseConnection interface 316 public CompoundTerm rpc(String functor, Object arg1) throws EclipseException, IOException 317 { 318 return(rpc(new CompoundTermImpl(functor, arg1))); 319 } 320 321 // Implementation of public method from EclipseConnection interface 322 public CompoundTerm rpc(String functor, Object arg1, 323 Object arg2) throws EclipseException, IOException 324 { 325 return(rpc(new CompoundTermImpl(functor, arg1, arg2))); 326 } 327 328 // Implementation of public method from EclipseConnection interface 329 public CompoundTerm rpc(String functor, Object arg1, 330 Object arg2, Object arg3) throws EclipseException, IOException 331 { 332 return(rpc(new CompoundTermImpl(functor, arg1, arg2, arg3))); 333 } 334 335 // Implementation of public method from EclipseConnection interface 336 public CompoundTerm rpc(String functor, Object arg1, 337 Object arg2, Object arg3, Object arg4) throws EclipseException, IOException 338 { 339 return(rpc(new CompoundTermImpl(functor, arg1, arg2, arg3, arg4))); 340 } 341 342 // Implementation of public method from EclipseConnection interface 343 public CompoundTerm rpc(String functor, Object arg1, 344 Object arg2, Object arg3, Object arg4, 345 Object arg5) throws EclipseException, IOException 346 { 347 return(rpc(new CompoundTermImpl(functor, arg1, arg2, arg3, arg4, arg5))); 348 } 349 350 // Implementation of public method from EclipseConnection interface 351 public CompoundTerm rpc(String functor, Object[] args) throws EclipseException, IOException 352 { 353 return(rpc(new CompoundTermImpl(functor, args))); 354 } 355 356 // Implementation of public method from EclipseConnection interface 357 public CompoundTerm rpc(Object[] goalTerm) throws EclipseException, IOException 358 { 359 return(rpc(new CompoundTermImpl(goalTerm))); 360 } 361 362 // Implementation of public method from EclipseConnection interface 363 public synchronized CompoundTerm rpc(CompoundTerm goal) throws EclipseException, IOException 364 { 365 testTerminated(); 366 return executeRpc(goal); 367 } 368 369 /** 370 * Common implementation for rpc, for both Strings and CompoundTerms. Relies 371 * on three abstract methods to be implemented by subclasses: sendGoal(Object), 372 * waitForEclipse() and receiveGoal(). 373 */ 374 private CompoundTerm executeRpc(Object goal) throws EclipseException, IOException 375 { 376 // send the goal object to ECLiPSe 377 sendGoal(goal); 378 // pass control to ECLiPSe and handle any events it generates when it 379 // returns control. Keep doing this until it reports that it has finished 380 // executing the rpc goal. 381 waitForEclipse(false); 382 // receive the goal term from ECLiPSe 383 CompoundTerm answer = (CompoundTerm) receiveGoal(); 384 385 // if the returned term is the atom fail 386 if(answer.functor().equals("fail") && 387 answer.arity() == 0) 388 { 389 // throw the appropriate fail exception 390 throw new Fail(goal); 391 } 392 // similarly for throw 393 if(answer.functor().equals("throw") && 394 answer.arity() == 0) 395 { 396 throw new Throw(goal); 397 } 398 // otherwise return the returned goal term. 399 return answer; 400 } 401 402 // Implementation of public method from EclipseConnection interface 403 public synchronized FromEclipseQueue getFromEclipseQueue(String name) 404 throws EclipseException, IOException 405 { 406 // throw exception if terminated 407 testTerminated(); 408 409 // try to get the numeric id of the stream in eclipse (returns 410 // negative if the stream name is not valid) 411 int id = getStreamNumber(name); 412 413 // if id is non-negative then see if it is registered 414 if(id >= 0) 415 { 416 FromEclipseQueue feq = lookupFromEclipseQueue(id); 417 418 // If so, return it. 419 if (feq != null) 420 { 421 return(feq); 422 } 423 } 424 425 try 426 { 427 // see if there is already a stream with the above name on the eclipse side 428 rpc("current_stream", new Atom(name)); 429 // if the above goal succeeds, throw an exception 430 throw(new EclipseException("Cannot create FromEclipseQueue: stream name in use.")); 431 } 432 catch(Fail e) // if the above goal fails 433 { 434 // additional setup routine 435 // this is implemented by an abstract method as it will vary according to 436 // subclass 437 setupFromEclipseQueue(name); 438 // create, register and return the queue based on this 439 return(createFromEclipseQueue(name)); 440 441 } 442 } 443 444 // Implementation of public method from EclipseConnection interface 445 public synchronized ToEclipseQueue getToEclipseQueue(String name) 446 throws EclipseException, IOException 447 { 448 // throw exception if terminated 449 testTerminated(); 450 451 // try to get the numeric id of the stream in eclipse (returns 452 // negative if the stream name is not valid) 453 int id = getStreamNumber(name); 454 455 // if id is non-negative then see if it is registered 456 if(id >= 0) 457 { 458 ToEclipseQueue teq = lookupToEclipseQueue(id); 459 460 // If it is, return it 461 if (teq != null) 462 { 463 return teq; 464 } 465 } 466 467 try 468 { 469 // see if there is already a stream with the above name 470 rpc("current_stream", new Atom(name)); 471 // if the above goal succeeds, throw an exception 472 throw(new EclipseException("Cannot create ToEclipseQueue: stream name in use.")); 473 } 474 catch(Fail e) // if the above goal fails 475 { 476 // additional setup routine 477 // this is implemented by an abstract method as it will vary according to 478 // subclass 479 setupToEclipseQueue(name); 480 // create, register and return the queue based on this 481 return(createToEclipseQueue(name)); 482 } 483 } 484 485 486 // Implementation of public method from EclipseConnection interface 487 public synchronized AsyncEclipseQueue getAsyncEclipseQueue(String name) 488 throws EclipseException, IOException 489 { 490 // throw exception if terminated 491 testTerminated(); 492 493 // try to get the numeric id of the stream in eclipse (returns 494 // negative if the stream name is not valid) 495 int id = getStreamNumber(name); 496 497 // if id is non-negative then see if it is registered 498 if(id >= 0) 499 { 500 AsyncEclipseQueue aeq = lookupAsyncEclipseQueue(id); 501 502 // If it is, return it 503 if (aeq != null) 504 { 505 return aeq; 506 } 507 } 508 509 try 510 { 511 // see if there is already a stream with the above name 512 rpc("current_stream", new Atom(name)); 513 // if the above goal succeeds, throw an exception 514 throw(new EclipseException("Cannot create AsyncEclipseQueue: stream name in use.")); 515 } 516 catch(Fail e) // if the above goal fails 517 { 518 // additional setup routine 519 // this is implemented by an abstract method as it will vary according to 520 // subclass 521 setupAsyncEclipseQueue(name); 522 // create, register and return the queue based on this 523 return(createAsyncEclipseQueue(name)); 524 } 525 } 526 527 /** 528 * Creates, registers and returns a new FromEclipseQueue object. Assumes that 529 * there is no FromEclipseQueue registered in this Eclipse with the same name. 530 * Also assumes that the Eclipse side of the queue has been set up. 531 */ 532 FromEclipseQueue createFromEclipseQueue(String name) throws IOException 533 { 534 int id = getStreamNumber(name); 535 FromEclipseQueue inQ = new FromEclipseQueue(id,name,this); 536 registerFromEclipseQueue(id,inQ); 537 return inQ; 538 } 539 540 /** 541 * Creates, registers and returns a new ToEclipseQueue object. Assumes that 542 * there is no ToEclipseQueue registered in this Eclipse with the same name. 543 * Also assumes that the Eclipse side of the queue has been set up. 544 */ 545 ToEclipseQueue createToEclipseQueue(String name) throws IOException 546 { 547 int id = getStreamNumber(name); 548 ToEclipseQueue outQ = new ToEclipseQueue(id,name,this); 549 registerToEclipseQueue(id,outQ); 550 return outQ; 551 } 552 553 /** 554 * Creates, registers and returns a new AsyncEclipseQueue object. Assumes that 555 * there is no AsyncEclipseQueue registered in this Eclipse with the same name. 556 * Also assumes that the Eclipse side of the queue has been set up. 557 */ 558 AsyncEclipseQueue createAsyncEclipseQueue(String name) throws IOException 559 { 560 int id = getStreamNumber(name); 561 AsyncEclipseQueue q = new AsyncEclipseQueue(id,name,this); 562 registerAsyncEclipseQueue(id,q); 563 return q; 564 } 565 566 InputStream getAsyncInputStream(int id) throws IOException 567 { 568 throw new IOException("Asynchronous queues not implemented for this connection type"); 569 } 570 571 OutputStream getAsyncOutputStream(int id) throws IOException 572 { 573 throw new IOException("Asynchronous queues not implemented for this connection type"); 574 } 575 576 /** 577 * Send an RPC goal to ECLiPSe. 578 */ 579 abstract void sendGoal(Object goal) throws IOException; 580 581 /** 582 * Receive an RPC goal from ECLiPSe. 583 */ 584 abstract Object receiveGoal() throws IOException; 585 586 587 /** 588 * Look up the stream number of an existing stream within this ECLiPSe, 589 * given its name. Returns negative if there is no stream with that name. 590 */ 591 int getStreamNumber(String streamName) throws IOException 592 { 593 try 594 { 595 rpc("current_stream", new Atom(streamName)); 596 CompoundTerm result = 597 rpc("get_stream_info", new Atom(streamName), new Atom("physical_stream"), null); 598 Integer stream_number = (Integer) result.arg(3); 599 return(stream_number.intValue()); 600 } 601 catch(EclipseException f) 602 { 603 // if there is no stream with the supplied name, return -1 604 return(-1); 605 } 606 607 } 608 609 610 /** 611 * Abstract methods, must be implemented by subclasses. These are used to 612 * supply the subclass-specific implementations of certain operations used by 613 * methods in both this class and other classes in the package. 614 */ 615 616 /** 617 * Perform any additional setup required to initialise a 618 * FromEclipseQueue 619 */ 620 abstract void setupFromEclipseQueue(String name) 621 throws EclipseException, IOException; 622 623 /** 624 * Perform any additional setup required to initialise a 625 * ToEclipseQueue 626 */ 627 abstract void setupToEclipseQueue(String name) 628 throws EclipseException, IOException; 629 630 /** 631 * Perform any additional setup required to initialise a 632 * AsyncEclipseQueue 633 */ 634 abstract void setupAsyncEclipseQueue(String name) 635 throws EclipseException, IOException; 636 637 /** 638 * Keep resuming ECLiPSe and handling any control signals it generates 639 * until a yield signal occurs. 640 */ 641 void waitForEclipse(boolean transferControlWithResume) throws IOException 642 { 643 boolean isFirstIteration = true; 644 ControlSignal nextControlSignal; 645 do 646 { 647 nextControlSignal = 648 getNextControlSignal(isFirstIteration, transferControlWithResume); 649 if(nextControlSignal == null) 650 throw new IOException("Unrecognised ECLiPSe control signal."); 651 isFirstIteration = false; 652 nextControlSignal.respond(); 653 } 654 while(!(nextControlSignal instanceof YieldSignal)); 655 } 656 657 abstract ControlSignal getNextControlSignal(boolean isFirstIteration, 658 boolean transferControlWithResume) 659 throws IOException; 660 661 662 abstract class ControlSignal 663 { 664 abstract void respond() throws IOException; 665 } 666 667 class YieldSignal extends ControlSignal 668 { 669 void respond() throws IOException 670 { 671 respondYield(); 672 } 673 } 674 675 class WaitIOSignal extends ControlSignal 676 { 677 private Integer streamID; 678 WaitIOSignal(Integer streamID) 679 { 680 this.streamID = streamID; 681 } 682 void respond() throws IOException 683 { 684 respondWaitIO(streamID); 685 } 686 } 687 688 class OpenQueueSignal extends ControlSignal 689 { 690 private Atom nameAtom; 691 private Integer streamID; 692 private Atom direction; 693 OpenQueueSignal(Atom nameAtom, Integer streamID, Atom direction) 694 { 695 this.nameAtom = nameAtom; 696 this.streamID = streamID; 697 this.direction = direction; 698 } 699 void respond() throws IOException 700 { 701 respondOpenQueue(nameAtom, streamID, direction); 702 } 703 } 704 705 void respondYield() throws IOException 706 { 707 } 708 709 void respondWaitIO(Integer streamID) throws IOException 710 { 711 // look up the queue in the toEclipseQueue register 712 ToEclipseQueue teq = lookupToEclipseQueue(streamID.intValue()); 713 // if it is not there, print a message to stderr 714 if (teq == null) { 715 System.err.println("ECLiPSe yielded after reading empty stream "+ 716 streamID.intValue() + 717 " which is not registered as a ToEclipseQueue."); 718 } else { 719 // otherwise notify the queue's listener of a request for data 720 // (causes its dataRequest method to be invoked) 721 teq.notifyRequest(); 722 } 723 } 724 725 void respondCloseQueue(Integer streamID) throws IOException 726 { 727 FromEclipseQueue feq = null; 728 ToEclipseQueue teq = null; 729 AsyncEclipseQueue aeq = null; 730 teq = lookupToEclipseQueue(streamID.intValue()); 731 if(teq != null) 732 { 733 teq.close_cleanup(); 734 closeToEclipseStreamJavaSide(streamID.intValue()); 735 return; 736 } 737 feq = lookupFromEclipseQueue(streamID.intValue()); 738 if(feq != null) 739 { 740 feq.close_cleanup(); 741 closeFromEclipseStreamJavaSide(streamID.intValue()); 742 return; 743 } 744 aeq = lookupAsyncEclipseQueue(streamID.intValue()); 745 if(aeq != null) 746 { 747 aeq.close_cleanup(); 748 closeAsyncEclipseStreamJavaSide(streamID.intValue()); 749 return; 750 } 751 System.err.println("Cannot close "+streamID+": not the "+ 752 "stream number of a registered ECLiPSe queue."); 753 } 754 755 756 void respondOpenQueue(Atom nameAtom, Integer streamID, Atom direction) 757 throws IOException 758 { 759 if(direction.functor().equals("fromec")) 760 { 761 createFromEclipseQueue(nameAtom.functor()); 762 } 763 else if(direction.functor().equals("toec")) 764 { 765 createToEclipseQueue(nameAtom.functor()); 766 } 767 else if(direction.functor().equals("bidirect")) 768 { 769 createAsyncEclipseQueue(nameAtom.functor()); 770 } 771 } 772 773 774 775 776 777 778 /** 779 * Read <code>len</code> bytes from this ECLiPSe's stream number 780 * <code>streamid</code> and store them in 781 * byte array <code>b</code> at offset <code>off</code>. 782 * 783 * @returns the number of bytes read. 784 */ 785 abstract int readFromStream(int streamid, int off, int len, byte[] b) 786 throws IOException; 787 788 /** 789 * Read a single byte from this ECLiPSe's stream number 790 * <code>streamid</code> 791 * 792 * @returns byte read, an int between 0 and 255 or -1 if 0 bytes were read. 793 */ 794 abstract int readByteFromStream(int streamid) throws IOException; 795 796 /** 797 * Returns the number of bytes available on stream streamid which may be 798 * read or skipped over without blocking. 799 */ 800 abstract int availableOnStream(int streamid) throws IOException; 801 802 803 /** 804 * Write <code>len</code> bytes to this ECLiPSe's stream number 805 * <code>streamid</code> at offset <code>off</code> from 806 * byte array <code>b</code>. 807 * 808 * @returns the number of bytes written. 809 */ 810 abstract int writeToStream(int streamid, byte[] b, int off, int len) 811 throws IOException; 812 813 /** 814 * Write a single byte to this ECLiPSe's stream number 815 * <code>streamid</code>. 816 * 817 */ 818 abstract void writeByteToStream(int streamid, byte b) 819 throws IOException; 820 821 /** 822 * Flush this ECLiPSe's stream number <code>streamid</code>. 823 * 824 */ 825 abstract void flushStream(int streamid) throws IOException; 826 827 // when a ToEclipseStream is closed, unregister it. Subclasses may 828 // over-ride this method and perform additional actions, but they should call 829 // it first using super. 830 void closeToEclipseStreamJavaSide(int streamid) throws IOException 831 { 832 ToEclipseQueue teq = lookupToEclipseQueue(streamid); 833 unregisterToEclipseQueue(streamid); 834 } 835 836 // when a AsyncEclipseStream is closed, unregister it. Subclasses may 837 // over-ride this method and perform additional actions, but they should call 838 // it first using super. 839 void closeAsyncEclipseStreamJavaSide(int streamid) throws IOException 840 { 841 unregisterAsyncEclipseQueue(streamid); 842 } 843 844 // when a FromEclipseStream is closed, unregister it. Subclasses may 845 // over-ride this method and perform additional actions, but they should call 846 // it first using super. 847 void closeFromEclipseStreamJavaSide(int streamid) throws IOException 848 { 849 FromEclipseQueue feq = lookupFromEclipseQueue(streamid); 850 unregisterFromEclipseQueue(streamid); 851 } 852 853 // Common code to close the eclipse side of a From/ToEclipse queue. Subclasses 854 // will override these methods to perform additional actions, but they should 855 // invoke them first using super. 856 void closeFromEclipseStreamEclipseSide(int streamid) throws IOException 857 { 858 } 859 860 void closeToEclipseStreamEclipseSide(int streamid) throws IOException 861 { 862 } 863 864 void closeAsyncEclipseStreamEclipseSide(int streamid) throws IOException 865 { 866 } 867 868 synchronized EclipseMultitaskConnection getEclipseMultitaskConnection() throws EclipseException, IOException { 869 if ( eclipseMultitaskConnection == null ) { 870 CompoundTermImpl resultGoal = 871 (CompoundTermImpl) 872 rpc("peer_register_multitask", 873 getPeerName(), 874 null); 875 String fromStream = ((Atom)resultGoal.arg(2)).functor(); 876 FromEclipseQueue queue = getFromEclipseQueue(fromStream); 877 eclipseMultitaskConnection = 878 new EclipseMultitaskConnectionImpl(this, queue); 879 } 880 return eclipseMultitaskConnection; 881 } 882 883 // implements method from EclipseConnection 884 public EclipseMultitaskConnection registerMultitask(MultitaskListener multitaskListener) throws EclipseException,IOException { 885 testTerminated(); 886 EclipseMultitaskConnection emc = getEclipseMultitaskConnection(); 887 return emc.registerMultitask(multitaskListener); 888 } 889} 890