1// BEGIN LICENSE BLOCK 2// Version: CMPL 1.1 3// 4// The contents of this file are subject to the Cisco-style Mozilla Public 5// License Version 1.1 (the "License"); you may not use this file except 6// in compliance with the License. You may obtain a copy of the License 7// at www.eclipse-clp.org/license. 8// 9// Software distributed under the License is distributed on an "AS IS" 10// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See 11// the License for the specific language governing rights and limitations 12// under the License. 13// 14// The Original Code is The ECLiPSe Constraint Logic Programming System. 15// The Initial Developer of the Original Code is Cisco Systems, Inc. 16// Portions created by the Initial Developer are 17// Copyright (C) 2001 - 2006 Cisco Systems, Inc. All Rights Reserved. 18// 19// Contributor(s): Josh Singer, Parc Technologies 20// 21// END LICENSE BLOCK 22//Title: Java/ECLiPSe interface 23//Version: $Id: RemoteEclipse.java,v 1.1 2006/09/23 01:54:12 snovello Exp $ 24//Author: Josh Singer 25//Company: Parc Technologies 26//Description: Connection to a remote ECLiPSe. 27package com.parctechnologies.eclipse; 28import java.io.*; 29import java.net.*; 30import java.util.*; 31 32 33 34/** 35 * Remote connection to an existing ECLiPSe process. The connection may be made 36 * over a TCP/IP network to an ECLiPSe which has been primed using the 37 * <code>remote_connect/3</code> or the <code>remote_connect_setup/3</code> 38 * builtin predicates. As well as the functionality provided by the 39 * <i>EclipseConnection</i> interface, <i>RemoteEclipse</i> also allows for 40 * execution control to be transferred explicity over to ECLiPSe with the 41 * <code>resume()</code> method. 42 * 43 * <p>The connection is terminated from the Java side using either the 44 * <code>disconnect()</code> method (when Java has execution control) or 45 * <code>unilateralDisconnect()</code> (when ECLiPSe has execution control). 46 * If the builtin predicate <code>remote_disconnect/1</code> is executed on 47 * the ECLiPSe side, the effect on the Java side are similar to the 48 * effects of <code>disconnect()</code>. 49 * 50 */ 51public class RemoteEclipse extends EclipseConnectionImpl implements EclipseConnection 52{ 53 /** 54 * Implementation overview 55 * 56 * I assume familiarity with the Eclips remote interface protocol. 57 * 58 * The main complication to the implementation of this class is the fact that 59 * ECLiPSe sockets have a very small fixed buffer size. Consequences: 60 * (1) When a client writes to an Eclipse socket and the socket's buffer 61 * reaches capacity, the client's write/flush command blocks indefinitely 62 * unless Eclipse is reading from the socket's stream. Because of this block, 63 * any read call which the user may set up using rpc is never reached. 64 * 65 * (2) Conversely, when Eclipse flushes an amount of data through the socket 66 * stream which is larger than capacity, its own write command blocks 67 * indefinitely unless there is a corresponding read command waiting on the 68 * Java side. However, the user cannot set up this read after Eclipse flushes, 69 * because control does not return to Java until after the write command 70 * completes. 71 * 72 * The way we handle (1) is to enclose the socket OutputStream in a special 73 * NonBlockingOutputStream, whose write and flush methods are non-blocking 74 * A thread is hidden within each NonBlockingOutputStream object. The thread 75 * is activated whenever the NonBlockingOutputStream is flushed and handles 76 * the write and flush calls to the underlying stream (the socket in this 77 * case) concurrently. The hidden thread may block, but the original thread 78 * is unblocked. It can therefore go ahead and set up a read call on the 79 * Eclipse side, which will remove the blockage, and read the data written by 80 * the hidden stream. 81 * 82 * (2) is handled using a buffer on the Java side. Before attempting to write 83 * the data, Eclipse sends an ec_flushio signal, indicating the amount about 84 * to be written. This amount of data is immediately read into a buffer. 85 * From there it can be read by the user later. See the 86 * FromEclipseQueueBuffer class for more details. 87 * 88 */ 89 90 91 92 // The socket for sending/receiving control signals 93 private Socket control; 94 // The socket for sending/receiving rpc goals 95 private Socket rpc; 96 // The name of the Eclipse remote connection which this object is connected 97 // to. 98 private String connectionName; 99 // Stream for receiving control signals in EXDR format 100 private EXDRInputStream controlEXDRInput; 101 // Stream for receiving rpc goals in EXDR format 102 private EXDRInputStream rpcEXDRInput; 103 // Stream for sending control signals in EXDR format 104 private EXDROutputStream controlEXDROutput; 105 // Stream for sending rpc goals in EXDR format 106 private EXDROutputStream rpcEXDROutput; 107 // common atoms 108 private static final Atom resumeAtom = new Atom("resume"); 109 private static final Atom rpcAtom = new Atom("rpc"); 110 private static final Atom yieldAtom = new Atom("yield"); 111 private static final Atom disconnectAtom = new Atom("disconnect"); 112 private static final Atom disconnectYieldAtom = new Atom("disconnect_yield"); 113 private static final Atom disconnectResumeAtom = new Atom("disconnect_resume"); 114 private static final Atom syncAtom = new Atom("sync"); 115 private static final Atom asyncAtom = new Atom("async"); 116 private static final Atom fromecAtom = new Atom("fromec"); 117 private static final Atom toecAtom = new Atom("toec"); 118 private static final Atom bidirectAtom = new Atom("bidirect"); 119 private static final Atom emptyAtom = new Atom(""); 120 private static final Atom failAtom = new Atom("fail"); 121 private static final Atom successAtom = new Atom("success"); 122 123 private Map queueInfo = new HashMap(); 124 125 // The address of the machine which Eclipse is running on, as read during the 126 // protocol. This should be used in subsequent client socket connections. 127 private InetAddress hostAddress; 128 129 private static final int PROTOCOL_VERSION_NUMBER = 1; 130 131 public static final int DEFAULT_TIMEOUT_MILLIS = 30000; 132 133 /** 134 * Make a connection to an existing ECLiPSe process. The ECLiPSe process must 135 * be on a server machine which is reachable on a TCP/IP network. ECLiPSe must be 136 * primed using <code>remote_connect_setup/3</code>. The builtin predicate 137 * <code>remote_connect_accept/6</code> should then be used to complete the 138 * connection. The connection details 139 * (IP address, port number, password) are specified as parameters and must match those 140 * specified/returned as arguments in the execution of 141 * <code>remote_connect_setup/3</code> and <code>remote_connect_accept/6</code>. 142 * 143 * @throws IOException if the connection could not be made, or times out 144 * within <code>DEFAULT_TIMEOUT_MILLIS</code> milliseconds. 145 */ 146 public RemoteEclipse(InetAddress remoteEclipseHost, int remoteEclipsePort, 147 String passwd) 148 throws IOException 149 { 150 this(remoteEclipseHost, remoteEclipsePort, passwd, DEFAULT_TIMEOUT_MILLIS); 151 } 152 153 /** 154 * Make a connection to an existing ECLiPSe process. The ECLiPSe process must be 155 * 156 * on a server machine which is reachable on a TCP/IP network. ECLiPSe must be 157 * primed using <code>remote_connect/3</code>. The connection details 158 * (IP address, port number) are specified as parameters and must match those 159 * specified/returned as arguments in the execution of 160 * <code>remote_connect/3</code>. If <code>remote_connect_setup/3</code>. was 161 * used to prime ECLiPSe for the remote connection, this constructor will fail 162 * as it does not use a password. 163 * 164 * @throws IOException if the connection could not be made, or times out 165 * within <code>DEFAULT_TIMEOUT_MILLIS</code> milliseconds. 166 */ 167 public RemoteEclipse(InetAddress remoteEclipseHost, int remoteEclipsePort) 168 throws IOException 169 { 170 this(remoteEclipseHost, remoteEclipsePort, "", DEFAULT_TIMEOUT_MILLIS); 171 } 172 173 /** 174 * Make a connection to an existing ECLiPSe process. The ECLiPSe process must 175 * be on a server machine which is reachable on a TCP/IP network. ECLiPSe must be 176 * primed using <code>remote_connect/3</code>. The connection details 177 * (IP address, port number) are specified as parameters and must match those 178 * specified/returned as arguments in the execution of 179 * <code>remote_connect/3</code>. If <code>remote_connect_setup/3</code>. was 180 * used to prime ECLiPSe for the remote connection, this constructor will fail 181 * as it does not use a password. 182 * 183 * @param timeoutMillis number of milliseconds to wait for the initial 184 * connection to be established before throwing an exception. Set 185 * <code>timeoutMillis</code> to 0 to wait indefinitely for the connection. 186 * 187 * @throws IOException if the connection could not be made, or times out 188 * within <code>timeoutMillis</code> milliseconds. 189 */ 190 public RemoteEclipse(InetAddress remoteEclipseHost, int remoteEclipsePort, 191 int timeoutMillis) 192 throws IOException 193 { 194 this(remoteEclipseHost, remoteEclipsePort, "", timeoutMillis); 195 } 196 197 private void setUpControl(InetAddress remoteEclipseHost, 198 int remoteEclipsePort) 199 throws IOException 200 { 201 control = new Socket(remoteEclipseHost, remoteEclipsePort); 202 // initialise the EXDR readers and writers on the control socket 203 controlEXDRInput = new EXDRInputStream(control.getInputStream()); 204 controlEXDROutput = new EXDROutputStream(control.getOutputStream()); 205 } 206 207 private void setUpRPC(InetAddress remoteEclipseHost, int remoteEclipsePort) 208 throws IOException 209 { 210 rpc = new Socket(remoteEclipseHost, remoteEclipsePort); 211 // Initialise readers and writers on these sockets 212 rpcEXDRInput = new EXDRInputStream(rpc.getInputStream()); 213 rpcEXDROutput = new EXDROutputStream(rpc.getOutputStream()); 214 } 215 216 217 218 /** 219 * Make a connection to an existing ECLiPSe process. The ECLiPSe process must 220 * be on a server machine which is reachable on a TCP/IP network. ECLiPSe must be 221 * primed using <code>remote_connect_setup/3</code>. The builtin predicate 222 * <code>remote_connect_accept/6</code> should then be used to complete the 223 * connection. The connection details 224 * (IP address, port number, password) are specified as parameters and must match those 225 * specified/returned as arguments in the execution of 226 * <code>remote_connect_setup/3</code> and <code>remote_connect_accept/6</code>. 227 * 228 * @param timeoutMillis number of milliseconds to wait for the initial 229 * connection to be established before throwing an exception. Set 230 * <code>timeoutMillis</code> to 0 to wait indefinitely for the connection. 231 * 232 * @throws IOException if the connection could not be made, or times out 233 * within <code>timeoutMillis</code> milliseconds. 234 */ 235 public RemoteEclipse(InetAddress remoteEclipseHost, int remoteEclipsePort, 236 String passwd, int timeoutMillis) 237 throws IOException 238 { 239 hostAddress = remoteEclipseHost; 240 241 // System.out.println("starting remote protocol"); 242 243 setUpControl(remoteEclipseHost, remoteEclipsePort); 244 245 CompoundTerm protocolTerm = 246 new CompoundTermImpl("remote_protocol", 247 new Integer(PROTOCOL_VERSION_NUMBER)); 248 249 //System.out.println("protocol term is "+protocolTerm); 250 251 writeControl(protocolTerm); 252 253 Object presponse = readControl(); 254 //System.out.println("presponse is "+presponse); 255 256 if(!presponse.equals("yes")) 257 { 258 //System.out.println("presponse not equal to \"yes\", closing control"); 259 control.close(); 260 throw(new IOException("Remote protocol error: protocol version unsupported.")); 261 } 262 263 //System.out.println("presponse equal to \"yes\", continuing protocol"); 264 265 writeControl(passwd); 266 267 // read the connectionName on the control socket 268 connectionName = ((Atom) readControlTimeout(timeoutMillis)).functor(); 269 270 // write the language name on the control socket 271 writeControl("java"); 272 // System.out.println("Read connection name: "+connectionName); 273 setUpRPC(remoteEclipseHost, remoteEclipsePort); 274 275 if(!connectionName.equals(((Atom) readRPCTimeout(timeoutMillis)).functor())) 276 { 277 throw(new IOException("Remote protocol error.")); 278 } 279 // set up the peer name locally 280 setPeerName(new Atom(connectionName)); 281 282 // System.out.println("remote protocol complete"); 283 } 284 285 286 /** 287 * Terminate the connection with ECLiPSe unilaterally. This method should be 288 * invoked in unforseen circumstances when the connection should be terminated 289 * while ECLiPSe has execution control. After 290 * <code>unilateralDisconnect</code> has 291 * been invoked, public methods invoked on this <i>RemoteEclipse</i> will 292 * throw <i>EclipseTerminatedException</i>s. 293 * 294 * @throws EclipseTerminatedException if the connection has already been 295 * terminated. 296 */ 297 public void unilateralDisconnect() throws EclipseTerminatedException 298 { 299 shouldDisconnect = false; // Avoid recursive calls to unilateralDisconnect 300 testTerminated(); 301 try 302 { 303 // don't use writeControl here as that might cause a recursive invocation 304 // of unilateralDisconnect. 305 controlEXDROutput.write(disconnectResumeAtom); 306 controlEXDROutput.flush(); 307 } 308 catch(Exception e) {} 309 310 try 311 { 312 terminateJavaSide(); 313 } 314 catch(IOException ioe) {} 315 316 } 317 318 /** 319 * Terminate the remote connection to ECLiPSe. This should be invoked while 320 * Java has control. If ECLiPSe has control then use 321 * <code>unlateralDisconnect</code> instead. After <code>disconnect</code> has 322 * been invoked, public methods invoked on this <i>RemoteEclipse</i> will 323 * throw <i>EclipseTerminatedException</i>s. 324 * 325 * @throws EclipseTerminatedException if the connection has already been 326 * terminated. 327 * @throws IOException if there was a problem communicating with ECLiPSe 328 * during disconnection. 329 */ 330 public synchronized void disconnect() throws IOException 331 { 332 testTerminated(); 333 // write the disconnect atom on the control connection 334 writeControl(disconnectAtom); 335 336 // read "disconnect_yield" on the control connection 337 Object result = null; 338 try 339 { 340 result = readControl(); 341 } 342 catch(IOException ioe) 343 { 344 // don't mind if the connection has already been lost 345 } 346 347 if(result != null && !result.equals(disconnectYieldAtom)) 348 { 349 throw new IOException("Remote protocol error."); 350 } 351 terminateJavaSide(); 352 } 353 354 355 private void terminateJavaSide() throws IOException 356 { 357 // set this object to terminated 358 terminated = true; 359 360 // close all user queues, but not the eclipse sides 361 closeAllQueues(false); 362 363 // try to close control and rpc sockets, but if this fails, don't worry. 364 try 365 { 366 control.close(); 367 } 368 catch(Exception e){} 369 try 370 { 371 rpc.close(); 372 } 373 catch(Exception e){} 374 375 } 376 377 /** 378 * Explicitly transfer execution control to ECLiPSe. ECLiPSe will resume 379 * execution immediately after the last goal which transferred control to 380 * Java (normally <code>remote_connect/3</code> or 381 * <code>remote_connect_setup/3</code>). This method should not be 382 * invoked while control has been transferred to Java using a 383 * <i>QueueListener</i>. An invocation of <code>resume()</code> should 384 * normally be paired with an execution of the builtin predicate 385 * <code>remote_yield/1</code>, which can return execution control to Java. 386 * 387 * @throws EclipseTerminatedException if the connection to ECLiPSe has been 388 * terminated. 389 * @throws IOException if there was a problem communicating with ECLiPSe. 390 */ 391 public synchronized void resume() throws IOException 392 { 393 testTerminated(); 394 waitForEclipse(true); 395 } 396 397 // called to perform setup code additional to what is done in 398 // EclipseConnectionImpl 399 void setupFromEclipseQueue(String name) throws EclipseException, IOException 400 { 401 // port to be used on eclipse side for socket 402 int port; 403 // Atoms for name and connectionName (used in rpc calls) 404 Atom nameAtom = new Atom(name); 405 // compound term returned by ECLiPSe during interaction 406 CompoundTerm result1 = null; 407 // write queue_create(QueueName, sync, fromec, '') on the control connection 408 writeControl(new CompoundTermImpl("queue_create", nameAtom, syncAtom, 409 fromecAtom, emptyAtom)); 410 411 // read the next term from the control connection 412 try 413 { 414 result1 = (CompoundTerm) readControl(); 415 } 416 catch(ClassCastException cce) 417 { 418 throw new IOException("Remote interface protocol error."); 419 } 420 421 422 if(result1.equals(yieldAtom)) 423 { 424 throw new EclipseException("Could not create ECLiPSe side of queue."); 425 } 426 427 // System.out.println("result1 = "+result1); 428 429 // check that the response obeys the protocol 430 if(!result1.functor().equals("socket_client") || 431 result1.arity() != 4 || 432 !(result1.arg(1) instanceof Integer) || 433 !(result1.arg(2) instanceof Atom) || 434 !(result1.arg(3) instanceof Atom) || 435 !(result1.arg(4) instanceof Atom) || 436 !((Atom) result1.arg(2)).equals(nameAtom) || 437 !((Atom) result1.arg(3)).functor().equals("sync") || 438 !((Atom) result1.arg(4)).functor().equals("fromec")) 439 { 440 throw new IOException("Remote interface protocol error."); 441 } 442 443 // extract port number from the response 444 port = ((Integer) result1.arg(1)).intValue(); 445 446 setupRemoteFromecQueue(nameAtom, port); 447 448 // send the resume message + wait for yield. 449 resume(); 450 } 451 452 private void setupRemoteFromecQueue(Atom nameAtom, int port) 453 throws IOException 454 { 455 // result term received during interaction 456 CompoundTerm result2 = null; 457 // Socket to be used for the new queue 458 Socket newSocket; 459 460 // try to connect the new socket to the address specified during the remote 461 // protocol initialisation and the port specified above. 462 try 463 { 464 newSocket = new Socket(hostAddress, port); 465 } 466 catch(IOException e) // thrown if (for example), something else 467 // has stolen the connection. 468 { 469 // inform ECLiPSe that socket creation failed 470 writeControl(new CompoundTermImpl("socket_connect", nameAtom, failAtom)); 471 // throw the exception 472 throw e; 473 } 474 // otherwise inform ECLiPSe that socket creation succeeded 475 writeControl(new CompoundTermImpl("socket_connect", nameAtom, successAtom)); 476 477 // read the next term from the control connection 478 try 479 { 480 result2 = (CompoundTerm) readControl(); 481 } 482 catch(ClassCastException cce) 483 { 484 throw new IOException("Remote interface protocol error."); 485 } 486 487 // check that this obeys the protocol 488 if(!result2.functor().equals("socket_accept") || 489 result2.arity() != 2 || 490 !(result2.arg(1) instanceof Atom) || 491 !(result2.arg(2) instanceof Integer) || 492 !((Atom) result2.arg(1)).equals(nameAtom)) 493 { 494 throw new IOException("Remote interface protocol error."); 495 } 496 // extract the stream number id of the named queue 497 Integer Id = (Integer) (result2.arg(2)); 498 int id = Id.intValue(); 499 500 setupFromecInfo(id, newSocket); 501 } 502 503 504 505 // called to perform setup code additional to what is done in 506 // EclipseConnectionImpl 507 void setupToEclipseQueue(String name) throws EclipseException, IOException 508 { 509 // port to be used on eclipse side for socket 510 int port; 511 // Atoms for name and connectionName (used in rpc calls) 512 Atom nameAtom = new Atom(name); 513 // compound term returned by ECLiPSe during interaction 514 CompoundTerm result1 = null; 515 // write queue_create(QueueName, sync, fromec, '') on the control connection 516 writeControl(new CompoundTermImpl("queue_create", nameAtom, syncAtom, 517 toecAtom, emptyAtom)); 518 519 // read the next term from the control connection 520 try 521 { 522 result1 = (CompoundTerm) readControl(); 523 } 524 catch(ClassCastException cce) 525 { 526 throw new IOException("Remote interface protocol error."); 527 } 528 529 530 if(result1.equals(yieldAtom)) 531 { 532 throw new EclipseException("Could not create ECLiPSe side of queue."); 533 } 534 535 // System.out.println("result1 = "+result1); 536 537 // check that the response obeys the protocol 538 if(!result1.functor().equals("socket_client") || 539 result1.arity() != 4 || 540 !(result1.arg(1) instanceof Integer) || 541 !(result1.arg(2) instanceof Atom) || 542 !(result1.arg(3) instanceof Atom) || 543 !(result1.arg(4) instanceof Atom) || 544 !((Atom) result1.arg(2)).equals(nameAtom) || 545 !((Atom) result1.arg(3)).functor().equals("sync") || 546 !((Atom) result1.arg(4)).functor().equals("toec")) 547 { 548 throw new IOException("Remote interface protocol error."); 549 } 550 551 // extract port number from the response 552 port = ((Integer) result1.arg(1)).intValue(); 553 554 setupRemoteToecQueue(nameAtom, port); 555 556 // send the resume message + wait for yield 557 resume(); 558 } 559 560 561 private void setupRemoteToecQueue(Atom nameAtom, int port) 562 throws IOException 563 { 564 // Socket to be used for the new queue 565 Socket newSocket; 566 // compound term returned by ECLiPSe during interaction 567 CompoundTerm result2 = null; 568 // try to connect the new socket to the address specified during the remote 569 // protocol initialisation and the port specified above. 570 try 571 { 572 newSocket = new Socket(hostAddress, port); 573 } 574 catch(IOException e) // thrown if (for example), something else 575 // has stolen the connection. 576 { 577 // inform ECLiPSe that socket creation failed 578 writeControl(new CompoundTermImpl("socket_connect", nameAtom, failAtom)); 579 // throw the exception 580 throw e; 581 } 582 // otherwise inform ECLiPSe that socket creation succeeded 583 writeControl(new CompoundTermImpl("socket_connect", nameAtom, successAtom)); 584 585 // read the next term from the control connection 586 try 587 { 588 result2 = (CompoundTerm) readControl(); 589 } 590 catch(ClassCastException cce) 591 { 592 throw new IOException("Remote interface protocol error."); 593 } 594 595 // check that this obeys the protocol 596 if(!result2.functor().equals("socket_accept") || 597 result2.arity() != 2 || 598 !(result2.arg(1) instanceof Atom) || 599 !(result2.arg(2) instanceof Integer) || 600 !((Atom) result2.arg(1)).equals(nameAtom)) 601 { 602 throw new IOException("Remote interface protocol error."); 603 } 604 // extract the stream number id of the named queue 605 Integer Id = (Integer) (result2.arg(2)); 606 int id = Id.intValue(); 607 608 this.setupToecInfo(id, newSocket); 609 } 610 611//------------------------------------------------------------------- 612 613 // called to perform setup code additional to what is done in 614 // EclipseConnectionImpl 615 void setupAsyncEclipseQueue(String name) throws EclipseException, IOException 616 { 617 // port to be used on eclipse side for socket 618 int port; 619 // Atoms for name and connectionName (used in rpc calls) 620 Atom nameAtom = new Atom(name); 621 // compound term returned by ECLiPSe during interaction 622 CompoundTerm result1 = null; 623 // write queue_create(QueueName, sync, fromec, '') on the control connection 624 writeControl(new CompoundTermImpl("queue_create", nameAtom, asyncAtom, 625 bidirectAtom, emptyAtom)); 626 627 // read the next term from the control connection 628 try 629 { 630 result1 = (CompoundTerm) readControl(); 631 } 632 catch(ClassCastException cce) 633 { 634 throw new IOException("Remote interface protocol error."); 635 } 636 637 638 if(result1.equals(yieldAtom)) 639 { 640 throw new EclipseException("Could not create ECLiPSe side of queue."); 641 } 642 643 // System.out.println("result1 = "+result1); 644 645 // check that the response obeys the protocol 646 if(!result1.functor().equals("socket_client") || 647 result1.arity() != 4 || 648 !(result1.arg(1) instanceof Integer) || 649 !(result1.arg(2) instanceof Atom) || 650 !(result1.arg(3) instanceof Atom) || 651 !(result1.arg(4) instanceof Atom) || 652 !((Atom) result1.arg(2)).equals(nameAtom) || 653 !((Atom) result1.arg(3)).functor().equals("async") || 654 !((Atom) result1.arg(4)).functor().equals("bidirect")) 655 { 656 throw new IOException("Remote interface protocol error."); 657 } 658 659 // extract port number from the response 660 port = ((Integer) result1.arg(1)).intValue(); 661 662 setupRemoteAsyncecQueue(nameAtom, port); 663 664 // send the resume message + wait for yield 665 resume(); 666 } 667 668 669 private void setupRemoteAsyncecQueue(Atom nameAtom, int port) 670 throws IOException 671 { 672 // Socket to be used for the new queue 673 Socket newSocket; 674 // compound term returned by ECLiPSe during interaction 675 CompoundTerm result2 = null; 676 // try to connect the new socket to the address specified during the remote 677 // protocol initialisation and the port specified above. 678 try 679 { 680 newSocket = new Socket(hostAddress, port); 681 } 682 catch(IOException e) // thrown if (for example), something else 683 // has stolen the connection. 684 { 685 // inform ECLiPSe that socket creation failed 686 writeControl(new CompoundTermImpl("socket_connect", nameAtom, failAtom)); 687 // throw the exception 688 throw e; 689 } 690 // otherwise inform ECLiPSe that socket creation succeeded 691 writeControl(new CompoundTermImpl("socket_connect", nameAtom, successAtom)); 692 693 // read the next term from the control connection 694 try 695 { 696 result2 = (CompoundTerm) readControl(); 697 } 698 catch(ClassCastException cce) 699 { 700 throw new IOException("Remote interface protocol error."); 701 } 702 703 // check that this obeys the protocol 704 if(!result2.functor().equals("socket_accept") || 705 result2.arity() != 2 || 706 !(result2.arg(1) instanceof Atom) || 707 !(result2.arg(2) instanceof Integer) || 708 !((Atom) result2.arg(1)).equals(nameAtom)) 709 { 710 throw new IOException("Remote interface protocol error."); 711 } 712 // extract the stream number id of the named queue 713 Integer Id = (Integer) (result2.arg(2)); 714 int id = Id.intValue(); 715 716 setupAsyncecInfo(id, newSocket); 717 } 718//------------------------------------------------------------------- 719 720 boolean shouldDisconnect = false; 721 722 void testTerminated() throws EclipseTerminatedException 723 { 724 if(shouldDisconnect) 725 { 726 unilateralDisconnect(); 727 } 728 super.testTerminated(); 729 } 730 731 // Called if the flush() method of a ToEclipseQueue is called. 732 synchronized void flushStream(int streamID) throws IOException 733 { 734 // write rem_flushio(Queuenumber, Bytes) on control connection & flush 735 writeControl(new CompoundTermImpl("rem_flushio", 736 new Integer(streamID), 737 new Integer(getBytesBuffered(streamID)))); 738 // get the buffer to flush (in the background) to its destination 739 getOutputStream(streamID).flush(); 740 // Now these bytes have been flushed, reset the byte counter 741 setBytesBuffered(streamID, 0); 742 // hand control to Eclipse and handle any resulting events until yield 743 waitForEclipse(false); 744 } 745 746 747 synchronized void writeByteToStream(int streamid, byte b) throws IOException 748 { 749 getOutputStream(streamid).write(0xff & b); 750 setBytesBuffered(streamid, getBytesBuffered(streamid)+1); 751 } 752 753 754 synchronized int writeToStream(int streamid, byte[] b, int off, int len) throws IOException 755 { 756 getOutputStream(streamid).write(b, off, len); 757 758 // since the above statement cannot have written less than len bytes unless it 759 // threw an exception, if execution reaches the current point, we know len 760 // bytes have been written. 761 762 setBytesBuffered(streamid, getBytesBuffered(streamid)+len); 763 return(len); 764 } 765 766 // Called if the available() method of a FromEclipseQueue is called. 767 synchronized int availableOnStream(int streamID) 768 { 769 return(this.availableInBuffer(streamID)); 770 } 771 772 // called by the read method in FromEclipseQueue. Returns -1 if there are no 773 // bytes in the buffer. 774 synchronized int readByteFromStream(int streamID) throws IOException 775 { 776 return(readByteFromBuffer(streamID)); 777 } 778 779 // called by the read methods of FromEclipseQueue. Returns the number of 780 // bytes read (may be less than len). 781 synchronized int readFromStream(int streamid, int off, int len, byte[] b) throws IOException 782 { 783 return(readBytesFromBuffer(streamid, b, off, len)); 784 } 785 786 787 ControlSignal getNextControlSignal(boolean isFirstIteration, 788 boolean transferControlWithResume) 789 throws IOException 790 { 791 if(transferControlWithResume || !isFirstIteration) 792 { 793 writeControl(resumeAtom); 794 } 795 796 CompoundTerm nextControlTerm = getNextControlTerm(); 797 798 if(signalsYield(nextControlTerm)) 799 { 800 return(new YieldSignal()); 801 } 802 803 if(signalsMultilateralDisconnect(nextControlTerm)) 804 { 805 return(new MultilateralDisconnectSignal()); 806 } 807 808 if(signalsUnilateralDisconnect(nextControlTerm)) 809 { 810 return(new UnilateralDisconnectSignal()); 811 } 812 813 if(signalsFlushIO(nextControlTerm)) 814 { 815 return(new FlushIOSignal((Integer) nextControlTerm.arg(1), 816 (Integer) nextControlTerm.arg(2))); 817 } 818 819 if(signalsWaitIO(nextControlTerm)) 820 { 821 return(new WaitIOSignal((Integer) nextControlTerm.arg(1))); 822 } 823 824 if(signalsCloseQueue(nextControlTerm)) 825 { 826 return(new CloseQueueSignal((Integer) nextControlTerm.arg(1))); 827 } 828 829 if(signalsOpenQueue(nextControlTerm)) 830 { 831 return(new RemoteOpenQueueSignal((Integer) nextControlTerm.arg(1), 832 (Atom) nextControlTerm.arg(2), 833 (Atom) nextControlTerm.arg(3), 834 (Atom) nextControlTerm.arg(4))); 835 } 836 837 // default, signifies unrecognised signal 838 return(null); 839 } 840 841 842 /** 843 * Send an RPC goal to ECLiPSe. 844 */ 845 void sendGoal(Object goal) throws IOException 846 { 847 writeControl(rpcAtom); 848 writeRPC(goal); 849 // System.out.println("Sent goal on rpc connection: "+goal); 850 } 851 852 /** 853 * Receive an RPC goal from ECLiPSe. 854 */ 855 Object receiveGoal() throws IOException 856 { 857 return(rpcEXDRInput.readTerm()); 858 } 859 860 void closeFromEclipseStreamJavaSide(int streamid) throws IOException 861 { 862 super.closeFromEclipseStreamJavaSide(streamid); 863 closeFromecSocket(streamid); 864 removeInfo(streamid); 865 } 866 867 void closeToEclipseStreamJavaSide(int streamid) throws IOException 868 { 869 super.closeToEclipseStreamJavaSide(streamid); 870 getOutputStream(streamid).close(); 871 closeToecSocket(streamid); 872 setBytesBuffered(streamid, 0); 873 } 874 875 void closeFromEclipseStreamEclipseSide(int streamid) throws IOException 876 { 877 super.closeFromEclipseStreamEclipseSide(streamid); 878 // write queue_close(streamid) on the control connection 879 writeControl(new CompoundTermImpl("queue_close", 880 new Integer(streamid))); 881 // wait for a yield signal on the control connection 882 Object result = readControl(); 883 if(!result.equals(yieldAtom)) 884 { 885 throw new IOException("Remote protocol error."); 886 } 887 } 888 889 void closeToEclipseStreamEclipseSide(int streamid) throws IOException 890 { 891 super.closeToEclipseStreamEclipseSide(streamid); 892 // write queue_close(streamid) on the control connection 893 writeControl(new CompoundTermImpl("queue_close", 894 new Integer(streamid))); 895 // wait for a yield signal on the control connection 896 Object result = readControl(); 897 if(!result.equals(yieldAtom)) 898 { 899 throw new IOException("Remote protocol error."); 900 } 901 902 } 903 904 void closeAsyncEclipseStreamEclipseSide(int streamid) throws IOException 905 { 906 super.closeAsyncEclipseStreamEclipseSide(streamid); 907 // write queue_close(streamid) on the control connection 908 writeControl(new CompoundTermImpl("queue_close", 909 new Integer(streamid))); 910 // wait for a yield signal on the control connection 911 Object result = readControl(); 912 if(!result.equals(yieldAtom)) 913 { 914 throw new IOException("Remote protocol error."); 915 } 916 917 } 918 919 /** 920 * This buffer is used to store FromEclipseQueue data flushed by eclipse 921 * through the socket after an ec_flushio signal. The buffer is initialised 922 * with a DataInputStream (wrapped around the Socket's input stream). When 923 * instructed, it can read a certain number of bytes from this stream into 924 * the buffer, using the readBytesFromSocket method. 925 * 926 * These bytes can be read back from the socket using the readByte or 927 * readBytes methods. Any amount can be read at a time as long as it does 928 * not exceed the amount available. 929 * 930 */ 931 private class FromEclipseQueueBuffer 932 { 933 // number of available bytes held in the buffer 934 private int available; 935 // DataInputStream where the bytes come from. We use a DataInputStream 936 // because it has the readFully method which blocks until either the 937 // specified number of bytes have been read, the end-of-file character has 938 // been reached, or there is an IOException 939 private DataInputStream socketInputStream; 940 // A queue storing chunks of bytes. Each chunk is the result of a call to 941 // readBytesFromSocket. 942 private Vector byteChunkVector; 943 // The byte chunk in the buffer from where the first byte should be read 944 private byte[] currentByteChunk; 945 // The number of bytes which have already been read from the first byte 946 // chunk 947 private int readFromCurrentByteChunk; 948 949 // constructor 950 FromEclipseQueueBuffer(InputStream socketInputStream) 951 { 952 // initialise instance variables 953 this.socketInputStream = new DataInputStream(socketInputStream); 954 available = 0; 955 byteChunkVector = new Vector(); 956 readFromCurrentByteChunk = 0; 957 currentByteChunk = null; 958 } 959 // return the number of unread bytes stored in the buffer (some read bytes 960 // will be stored in the currentByteChunk until it is exhausted). 961 int available() 962 { 963 return(available); 964 } 965 // method to move currentByteChunk on to the next member of the queue. 966 // If the queue is non-empty, the first member is popped and becomes the 967 // current byte chunk. Otherwise the currentByteChunk is set to null. 968 private void nextChunk() 969 { 970 if(byteChunkVector.size() > 0) 971 { 972 currentByteChunk = (byte []) byteChunkVector.remove(0); 973 } 974 else 975 { 976 currentByteChunk = null; 977 } 978 // we read from the beginning of the new currentByteChunk 979 readFromCurrentByteChunk = 0; 980 } 981 982 // Method to read a byte from the buffer. Returns -1 if the buffer is empty 983 int readByte() 984 { 985 byte signed_byte; 986 987 if(available == 0) 988 { 989 return(-1); 990 } 991 // if we have reached the end of the current chunk, move on to the next. 992 // Since available > 0, next chunk cannot be null 993 if(readFromCurrentByteChunk == currentByteChunk.length) 994 { 995 nextChunk(); 996 } 997 // One fewer bytes is available 998 available--; 999 // return the array element from the current byte chunk 1000 signed_byte = currentByteChunk[readFromCurrentByteChunk++]; 1001 // in the byte chunk it is interpreted as between -128 and 127, 1002 // here we want a number between 0 and 255: if it is less than 0, add 256 1003 if(signed_byte < 0) 1004 { 1005 return(signed_byte+256); 1006 } 1007 else 1008 { 1009 return(signed_byte); 1010 } 1011 } 1012 1013 // read <len> bytes from the buffer into array b, at offset <off>. 1014 // returns the number of bytes copied. 1015 int readBytes(byte[] b, int off, int len) 1016 { 1017 // the number of bytes copied into b, the number of unread bytes in the 1018 // current byte chunk and the number of bytes still remaining to be 1019 // copied into b 1020 int bytesCopied = 0, availableInCurrentChunk, bytesRequired; 1021 // while we have not copied enough bytes and there are still bytes to 1022 // copy... 1023 while(bytesCopied < len && available > 0) 1024 { 1025 // calculate no. of remaining bytes in current chunk 1026 availableInCurrentChunk = 1027 currentByteChunk.length - readFromCurrentByteChunk; 1028 // calculate no. of bytes still to be copied 1029 bytesRequired = len - bytesCopied; 1030 // if we can't get all remaining bytes from the current chunk 1031 if(availableInCurrentChunk < bytesRequired) 1032 { 1033 // copy all remaining bytes from the current chunk into b 1034 System.arraycopy(currentByteChunk, readFromCurrentByteChunk, 1035 b, off + bytesCopied, availableInCurrentChunk); 1036 // update available and bytesCopied accordingly 1037 available -= availableInCurrentChunk; 1038 bytesCopied += availableInCurrentChunk; 1039 // move on to the next chunk 1040 nextChunk(); 1041 } 1042 else // If we can get all remaining bytes from the current chunk into b 1043 { 1044 // copy the required bytes 1045 System.arraycopy(currentByteChunk, readFromCurrentByteChunk, 1046 b, off + bytesCopied, bytesRequired); 1047 // update available, bytesCopied and readFromCurrentByteChunk 1048 // accordingly 1049 available -= bytesRequired; 1050 bytesCopied += bytesRequired; 1051 readFromCurrentByteChunk += bytesRequired; 1052 } 1053 } 1054 // return the number of bytes copied 1055 return(bytesCopied); 1056 } 1057 1058 // read nBytes bytes from the input stream into the buffer. Block until this 1059 // is complete or it becomes impossible due to an exception. 1060 void readBytesFromSocket(int nBytes) throws IOException 1061 { 1062 // don't add an empty chunk if nBytes is 0 1063 if(nBytes == 0) return; 1064 // initialise a byte array for the new chunk 1065 byte[] newChunk = new byte[nBytes]; 1066 // readFully the new chunk from the stream (blocks until complete or 1067 // throws exception) 1068 socketInputStream.readFully(newChunk); 1069 // add the new chunk on the end of the byteChunkVector queue. 1070 byteChunkVector.add(newChunk); 1071 // if the currentByteChunk was null because all the bytes were exhausted 1072 if(currentByteChunk == null) 1073 { 1074 // move it on to the first chunk 1075 nextChunk(); 1076 } 1077 // update available 1078 available += nBytes; 1079 } 1080 } 1081 1082 private void writeControl(Object message) throws IOException 1083 { 1084 try 1085 { 1086 controlEXDROutput.write(message); 1087 controlEXDROutput.flush(); 1088 } 1089 catch(SocketException e) 1090 { 1091 unilateralDisconnect(); 1092 throw(new EclipseTerminatedException()); 1093 } 1094 } 1095 1096 private void writeRPC(Object message) throws IOException 1097 { 1098 try 1099 { 1100 rpcEXDROutput.write(message); 1101 rpcEXDROutput.flush(); 1102 } 1103 catch(SocketException e) 1104 { 1105 unilateralDisconnect(); 1106 throw(new EclipseTerminatedException()); 1107 } 1108 } 1109 1110 private Object readControl() throws IOException 1111 { 1112 try 1113 { 1114 return(controlEXDRInput.readTerm()); 1115 } 1116 catch(EOFException e) 1117 { 1118 unilateralDisconnect(); 1119 throw(new EclipseTerminatedException()); 1120 } 1121 } 1122 1123 private Object readControlTimeout(int timeoutMillis) throws IOException 1124 { 1125 int old_timeout; 1126 old_timeout = control.getSoTimeout(); 1127 try 1128 { 1129 control.setSoTimeout(timeoutMillis); 1130 return(readControl()); 1131 } 1132 finally 1133 { 1134 control.setSoTimeout(old_timeout); 1135 } 1136 } 1137 1138 1139 private Object readRPCTimeout(int timeoutMillis) throws IOException 1140 { 1141 int old_timeout; 1142 old_timeout = rpc.getSoTimeout(); 1143 try 1144 { 1145 rpc.setSoTimeout(timeoutMillis); 1146 return(readRPC()); 1147 } 1148 finally 1149 { 1150 rpc.setSoTimeout(old_timeout); 1151 } 1152 } 1153 1154 1155 1156 private Object readRPC() throws IOException 1157 { 1158 try 1159 { 1160 return(rpcEXDRInput.readTerm()); 1161 } 1162 catch(EOFException e) 1163 { 1164 unilateralDisconnect(); 1165 throw(new EclipseTerminatedException()); 1166 } 1167 1168 } 1169 1170 1171 /** 1172 * Finalizer method called when object is to be garbage collected 1173 */ 1174 protected void finalize() throws IOException, EclipseException 1175 { 1176 this.unilateralDisconnect(); 1177 } 1178 1179 1180 private CompoundTerm getNextControlTerm() throws IOException 1181 { 1182 Object nextControlObj = readControl(); 1183 1184 if(!(nextControlObj instanceof CompoundTerm)) 1185 { 1186 throw(new IOException("Remote interface protocol error: control object not CompoundTerm")); 1187 } 1188 return((CompoundTerm) nextControlObj); 1189 1190 } 1191 1192 1193 private boolean signalsYield(CompoundTerm controlTerm) 1194 { 1195 return(controlTerm.equals(yieldAtom)); 1196 } 1197 private boolean signalsMultilateralDisconnect(CompoundTerm controlTerm) 1198 { 1199 return(controlTerm.equals(disconnectAtom)); 1200 } 1201 private boolean signalsUnilateralDisconnect(CompoundTerm controlTerm) 1202 { 1203 return(controlTerm.equals(disconnectYieldAtom)); 1204 } 1205 private boolean signalsFlushIO(CompoundTerm controlTerm) 1206 { 1207 return(controlTerm.functor().equals("ec_flushio") && 1208 controlTerm.arity() == 2 && 1209 controlTerm.arg(1) instanceof Integer && 1210 controlTerm.arg(2) instanceof Integer); 1211 } 1212 private boolean signalsWaitIO(CompoundTerm controlTerm) 1213 { 1214 return(controlTerm.functor().equals("ec_waitio") && 1215 controlTerm.arity() == 1 && 1216 controlTerm.arg(1) instanceof Integer); 1217 } 1218 private boolean signalsCloseQueue(CompoundTerm controlTerm) 1219 { 1220 return(controlTerm.functor().equals("queue_close") && 1221 controlTerm.arity() == 1 && 1222 controlTerm.arg(1) instanceof Integer); 1223 } 1224 private boolean signalsOpenQueue(CompoundTerm controlTerm) 1225 { 1226 return(controlTerm.functor().equals("socket_client") && 1227 controlTerm.arity() == 4 && 1228 controlTerm.arg(1) instanceof Integer && 1229 controlTerm.arg(2) instanceof Atom && 1230 controlTerm.arg(3) instanceof Atom && 1231 controlTerm.arg(4) instanceof Atom); 1232 } 1233 1234 1235 private void respondMultilateralDisconnect() 1236 throws IOException 1237 { 1238 //System.out.println("disconnection signal recieved from eclipse"); 1239 // respond with the disconnect_resume acknowledgement 1240 writeControl(disconnectResumeAtom); 1241 //System.out.println("sent disconnect_resume message"); 1242 // clean up java side 1243 terminateJavaSide(); 1244 //System.out.println("completed java side of disconnection"); 1245 throw(new EclipseTerminatedException()); 1246 } 1247 private void respondUnilateralDisconnect() 1248 throws IOException 1249 { 1250 // clean up java side 1251 terminateJavaSide(); 1252 //System.out.println("completed java side of disconnection"); 1253 throw(new EclipseTerminatedException()); 1254 } 1255 private void respondFlushIO(Integer streamID, Integer bytesFlushed) 1256 throws IOException 1257 { 1258 FromEclipseQueue feq; 1259 // look up the FromEclipseQueue based on the supplied stream number 1260 feq = lookupFromEclipseQueue(streamID.intValue()); 1261 // if it is not there, print a message to stderr 1262 if (feq == null) 1263 System.err.println("ECLiPSe yielded after flushing stream "+streamID.intValue() + 1264 " which is not registered as FromEclipseQueue."); 1265 else 1266 { 1267 bufferBytesFromSocket(streamID.intValue(), bytesFlushed.intValue()); 1268 feq.notifyAvailable(); 1269 } 1270 1271 // pass control back to eclipse 1272 1273 } 1274 1275 void respondWaitIO(Integer streamID) throws IOException 1276 { 1277 super.respondWaitIO(streamID); 1278 // waitIO must always be answered with at least one rem_flushio/yield signal 1279 // pair. In this case the number of bytes flushed is 0, in case nothing 1280 // has been written to the queue. If something has been written but not 1281 // flushed, this is the API user's problem. The ECLiPSe-side read should 1282 // just block. 1283 writeControl(new CompoundTermImpl("rem_flushio", 1284 streamID, 1285 new Integer (0))); 1286 // hand control to Eclipse and handle any resulting events until yield 1287 waitForEclipse(false); 1288 1289 1290 } 1291 1292 private void respondRemoteOpenQueue(Integer port, Atom nameAtom, Atom type, 1293 Atom direction) 1294 throws IOException 1295 { 1296 1297 if(type.equals(syncAtom)) 1298 { 1299 if(direction.equals(toecAtom)) 1300 { 1301 setupRemoteToecQueue(nameAtom, port.intValue()); 1302 createToEclipseQueue(nameAtom.functor()); 1303 1304 return; 1305 } 1306 1307 if(direction.equals(fromecAtom)) 1308 { 1309 setupRemoteFromecQueue(nameAtom, port.intValue()); 1310 createFromEclipseQueue(nameAtom.functor()); 1311 1312 return; 1313 } 1314 1315 throw new IOException("Remote interface protocol error: queue direction not recognised."); 1316 } 1317 else if(type.equals(asyncAtom)) 1318 { 1319 if(direction.equals(bidirectAtom)) 1320 { 1321 setupRemoteAsyncecQueue(nameAtom, port.intValue()); 1322 createAsyncEclipseQueue(nameAtom.functor()); 1323 1324 return; 1325 } 1326 throw new IOException("Remote interface protocol error: queue direction not recognised."); 1327 } 1328 1329 throw new IOException("Remote interface protocol error: queue type not recognised."); 1330 1331 } 1332 1333 class MultilateralDisconnectSignal extends ControlSignal 1334 { 1335 void respond() throws IOException 1336 { 1337 respondMultilateralDisconnect(); 1338 } 1339 } 1340 1341 class UnilateralDisconnectSignal extends ControlSignal 1342 { 1343 void respond() throws IOException 1344 { 1345 respondUnilateralDisconnect(); 1346 } 1347 } 1348 1349 class FlushIOSignal extends ControlSignal 1350 { 1351 private Integer streamID, bytesFlushed; 1352 1353 FlushIOSignal(Integer streamID, Integer bytesFlushed) 1354 { 1355 this.streamID = streamID; 1356 this.bytesFlushed = bytesFlushed; 1357 } 1358 1359 void respond() throws IOException 1360 { 1361 respondFlushIO(streamID, bytesFlushed); 1362 } 1363 } 1364 1365 class CloseQueueSignal extends ControlSignal 1366 { 1367 private Integer streamID; 1368 1369 CloseQueueSignal(Integer streamID) 1370 { 1371 this.streamID = streamID; 1372 } 1373 1374 void respond() throws IOException 1375 { 1376 respondCloseQueue(streamID); 1377 } 1378 } 1379 1380 class RemoteOpenQueueSignal extends ControlSignal 1381 { 1382 private Integer port; 1383 private Atom nameAtom; 1384 private Atom type; 1385 private Atom direction; 1386 1387 RemoteOpenQueueSignal(Integer port, Atom nameAtom, Atom type, Atom direction) 1388 { 1389 this.port = port; 1390 this.nameAtom = nameAtom; 1391 this.type = type; 1392 this.direction = direction; 1393 } 1394 1395 void respond() throws IOException 1396 { 1397 respondRemoteOpenQueue(port, nameAtom, type, direction); 1398 } 1399 } 1400 1401 private FromEclipseQueueInfo getFromecInfo(int streamID) 1402 { 1403 return((FromEclipseQueueInfo) queueInfo.get(new Integer(streamID))); 1404 } 1405 1406 private void bufferBytesFromSocket(int streamID, int nBytes) 1407 throws IOException 1408 { 1409 try 1410 { 1411 getFromecInfo(streamID).getBuffer().readBytesFromSocket(nBytes); 1412 } 1413 catch(EOFException e) 1414 { 1415 unilateralDisconnect(); 1416 throw new EclipseTerminatedException(); 1417 } 1418 } 1419 1420 private void closeFromecSocket(int streamID) throws IOException 1421 { 1422 getFromecInfo(streamID).getSocket().close(); 1423 } 1424 1425 private void removeInfo(int streamID) 1426 { 1427 queueInfo.remove(new Integer(streamID)); 1428 } 1429 1430 private void setupFromecInfo(int streamID, Socket socket) throws IOException 1431 { 1432 1433 queueInfo.put(new Integer(streamID), new FromEclipseQueueInfo(socket)); 1434 } 1435 1436 private int readByteFromBuffer(int streamID) 1437 { 1438 return(getFromecInfo(streamID).readByteFromBuffer()); 1439 } 1440 1441 private int readBytesFromBuffer(int streamID, byte[] b, int off, int len) 1442 { 1443 return(getFromecInfo(streamID).readBytesFromBuffer(b, off, len)); 1444 } 1445 1446 private int availableInBuffer(int streamID) 1447 { 1448 return(getFromecInfo(streamID).availableInBuffer()); 1449 } 1450 1451 private AsyncEclipseQueueInfo getAsyncecInfo(int streamID) 1452 { 1453 return((AsyncEclipseQueueInfo) queueInfo.get(new Integer(streamID))); 1454 } 1455 1456 private void closeAsyncecSocket(int streamID) throws IOException 1457 { 1458 getAsyncecInfo(streamID).getSocket().close(); 1459 } 1460 1461 private void setupAsyncecInfo(int streamID, Socket socket) throws IOException 1462 { 1463 1464 queueInfo.put(new Integer(streamID), new AsyncEclipseQueueInfo(socket)); 1465 } 1466 1467 InputStream getAsyncInputStream(int streamID) throws IOException 1468 { 1469 return getAsyncecInfo(streamID).getInputStream(); 1470 } 1471 1472 OutputStream getAsyncOutputStream(int streamID) throws IOException 1473 { 1474 return getAsyncecInfo(streamID).getOutputStream(); 1475 } 1476 1477 void closeAsyncEclipseStreamJavaSide(int streamid) throws IOException 1478 { 1479 super.closeAsyncEclipseStreamJavaSide(streamid); 1480 closeAsyncecSocket(streamid); 1481 } 1482 1483 private class FromEclipseQueueInfo 1484 { 1485 private Socket socket; 1486 private FromEclipseQueueBuffer fromEclipseQueueBuffer; 1487 FromEclipseQueueInfo(Socket socket) throws IOException 1488 { 1489 this.socket = socket; 1490 this.fromEclipseQueueBuffer 1491 = new FromEclipseQueueBuffer(socket.getInputStream()); 1492 } 1493 1494 FromEclipseQueueBuffer getBuffer() 1495 { 1496 return(fromEclipseQueueBuffer); 1497 } 1498 1499 Socket getSocket() 1500 { 1501 return(socket); 1502 } 1503 1504 int readByteFromBuffer() 1505 { 1506 return(fromEclipseQueueBuffer.readByte()); 1507 } 1508 int readBytesFromBuffer(byte[] b, int off, int len) 1509 { 1510 return(fromEclipseQueueBuffer.readBytes(b, off, len)); 1511 } 1512 int availableInBuffer() 1513 { 1514 return(fromEclipseQueueBuffer.available()); 1515 } 1516 1517 } 1518 1519 private class AsyncEclipseQueueInfo 1520 { 1521 private Socket socket; 1522 1523 AsyncEclipseQueueInfo(Socket socket) throws IOException 1524 { 1525 this.socket = socket; 1526 } 1527 1528 Socket getSocket() 1529 { 1530 return(socket); 1531 } 1532 1533 InputStream getInputStream() throws IOException 1534 { 1535 return(socket.getInputStream()); 1536 } 1537 1538 OutputStream getOutputStream() throws IOException 1539 { 1540 return(socket.getOutputStream()); 1541 } 1542 } 1543 1544 1545 1546 private ToEclipseQueueInfo getToecInfo(int streamID) 1547 { 1548 return((ToEclipseQueueInfo) queueInfo.get(new Integer(streamID))); 1549 } 1550 1551 private void closeToecSocket(int streamID) throws IOException 1552 { 1553 getToecInfo(streamID).getSocket().close(); 1554 } 1555 1556 private void setupToecInfo(int streamID, Socket socket) throws IOException 1557 { 1558 1559 queueInfo.put(new Integer(streamID), new ToEclipseQueueInfo(socket)); 1560 } 1561 1562 private void setBytesBuffered(int streamID, int newVal) 1563 { 1564 getToecInfo(streamID).setBytesBuffered(newVal); 1565 } 1566 1567 private int getBytesBuffered(int streamID) 1568 { 1569 return(getToecInfo(streamID).getBytesBuffered()); 1570 } 1571 1572 private OutputStream getOutputStream(int streamID) 1573 { 1574 return(getToecInfo(streamID).getOutputStream()); 1575 } 1576 1577 private class ToEclipseQueueInfo 1578 { 1579 private Socket socket; 1580 private int bytesBuffered; 1581 private OutputStream outputStream; 1582 1583 void setBytesBuffered(int newValue) 1584 { 1585 bytesBuffered = newValue; 1586 } 1587 int getBytesBuffered() 1588 { 1589 return(bytesBuffered); 1590 } 1591 OutputStream getOutputStream() 1592 { 1593 return(outputStream); 1594 } 1595 1596 ToEclipseQueueInfo(Socket socket) throws IOException 1597 { 1598 this.socket = socket; 1599 this.outputStream = new NonBlockingOutputStream(socket.getOutputStream()); 1600 bytesBuffered = 0; 1601 } 1602 1603 Socket getSocket() 1604 { 1605 return(socket); 1606 } 1607 1608 } 1609 1610 /** 1611 * An OutputStream whose write and flush methods do not block. This is useful 1612 * for when we would like to write large amounts of data to an output stream 1613 * but where that OutputStream's write/flush methods would normally block when 1614 * given such a large amount of input data. 1615 * 1616 * An instance is initialised by passing it a reference to the underlying 1617 * OutputStream. 1618 * 1619 * NonBlockingOutputStream's write methods store the bytes in a buffer and 1620 * therefore do not block. 1621 * 1622 * The flush method clears the buffer and puts its contents as a chunk of bytes 1623 * on a ByteChunkQueue. There is a "consumer" thread at the other end of the 1624 * queue, reading chunks of bytes and writing them to the underlying stream, 1625 * flushing it after each chunk. 1626 * 1627 * The write/flush methods will not raise IOExceptions by writing to the 1628 * buffer. However, if the copier thread was thrown an IOException when trying 1629 * to write to or flush the underlying stream, this IOException is stored, 1630 * and thrown at the next call of flush, write or close, with the text written 1631 * within an appropriate message. The close method may throw 1632 * IOExceptions either because an IOException from before was stored, or 1633 * because one was raised during its own operations. 1634 * 1635 * 1636 */ 1637 private class NonBlockingOutputStream extends OutputStream 1638 { 1639 // The underlying stream where bytes are eventually written by the copier 1640 // thread 1641 private OutputStream underlying_stream; 1642 1643 // The byte array where bytes are initially written 1644 private ByteArrayOutputStream bufferStream; 1645 1646 // the copier thread (member class) 1647 private CopierThread copierThread; 1648 1649 // if an IOException was thrown during writing to the underlying stream, 1650 // it is stored using this reference 1651 private IOException thrownException; 1652 1653 // the queue on which chunks of bytes are queued, one chunk for each time the 1654 // flush method is called 1655 private ByteChunkQueue byteChunkQueue; 1656 1657 // constructor 1658 NonBlockingOutputStream(OutputStream underlying_stream) 1659 { 1660 // Initialise instance variables 1661 thrownException = null; 1662 this.underlying_stream = underlying_stream; 1663 bufferStream = new ByteArrayOutputStream(); 1664 byteChunkQueue = new ByteChunkQueue(); 1665 copierThread = new CopierThread(); 1666 // Start the copier thread 1667 copierThread.start(); 1668 } 1669 1670 // Method to throw the last IOException, wrapped in an appropriate message. 1671 // Also, resets the stored exception. 1672 private void throwLastIOException() throws IOException 1673 { 1674 if(thrownException != null) 1675 { 1676 throw(thrownException); 1677 } 1678 } 1679 1680 // The public write methods work as follows: 1681 // 1. they throw any left-over IOExceptions 1682 // 2. they write the data to the buffer 1683 public void write(int b) throws IOException 1684 { 1685 throwLastIOException(); 1686 bufferStream.write(b); 1687 } 1688 1689 public void write(byte[] b) throws IOException 1690 { 1691 throwLastIOException(); 1692 bufferStream.write(b); 1693 } 1694 1695 public void write(byte[] b, int off, int len) throws IOException 1696 { 1697 throwLastIOException(); 1698 bufferStream.write(b, off, len); 1699 } 1700 1701 // returns the buffer contents as a byte array and clears the buffer. 1702 private byte[] getByteArrayAndReset() 1703 { 1704 byte[] bytes = bufferStream.toByteArray(); 1705 bufferStream.reset(); 1706 return(bytes); 1707 } 1708 1709 // reads a chunk from the front of the queue, writes it to the underlying 1710 // stream and flushes it. Any IOExceptions from these operations are stored 1711 // in thrownException 1712 private void copyAndFlush() 1713 { 1714 // Get the next chunk of bytes from the buffer. 1715 byte[] copy_chunk = byteChunkQueue.retrieveChunk(); 1716 1717 try 1718 { 1719 // Try to write them to the buffer 1720 underlying_stream.write(copy_chunk); 1721 1722 // If this works, try to flush underlying stream 1723 underlying_stream.flush(); 1724 } 1725 catch(IOException ioe) 1726 { 1727 // Store any thrown exception and make the next invocation of 1728 // testTerminated disconnect unilaterally. 1729 thrownException = ioe; 1730 shouldDisconnect = true; 1731 } 1732 1733 } 1734 1735 // throws any leftover exception. 1736 // clears the buffer and stores its bytes as a chunk on the end of the queue 1737 // it wakes up any threads waiting on the queue. 1738 public void flush() throws IOException 1739 { 1740 // throw any Exceptions if there are any 1741 throwLastIOException(); 1742 // if there are any bytes in the buffer 1743 if(bufferStream.size() > 0) 1744 { 1745 // get the buffer as a byte chunk and reset it 1746 byte[] byteChunk = getByteArrayAndReset(); 1747 // add the chunk to the queue 1748 byteChunkQueue.addChunk(byteChunk); 1749 // wake up the copier thread 1750 copierThread.wake(); 1751 } 1752 } 1753 1754 // NOTE: does not wait until byte chunk queue is empty. Does wait until 1755 // copierThread is dead. 1756 public void close() throws IOException 1757 { 1758 // throw any stored exceptions 1759 throwLastIOException(); 1760 // stop the copier thread 1761 copierThread.terminate(); 1762 1763 copierThread.interrupt(); // in case copierthread waiting and has not been 1764 // notified 1765 1766 // keep sleeping 1/4 of a second until copierThread is dead. 1767 while(copierThread.isAlive()) 1768 { 1769 try 1770 { 1771 Thread.currentThread().sleep(250); 1772 } 1773 catch(InterruptedException ie) 1774 { 1775 } 1776 } 1777 // close the underlying stream 1778 underlying_stream.close(); 1779 } 1780 1781 // Object to represent the queue of byte chunks 1782 private class ByteChunkQueue 1783 { 1784 // represent the queue using Vector 1785 Vector queue; 1786 // constructor 1787 ByteChunkQueue() 1788 { 1789 queue = new Vector(); 1790 } 1791 1792 // synchronized methods for adding/retrieving and checking/waiting if empty. 1793 synchronized boolean isEmpty() 1794 { 1795 return(queue.isEmpty()); 1796 } 1797 1798 synchronized void waitUntilEmpty() 1799 { 1800 while(!isEmpty()) 1801 { 1802 try 1803 { 1804 wait(); 1805 } 1806 catch(InterruptedException ie) {} 1807 } 1808 } 1809 1810 synchronized void addChunk(byte[] byteChunk) 1811 { 1812 queue.add(byteChunk); 1813 } 1814 1815 synchronized byte[] retrieveChunk() 1816 { 1817 byte[] removedChunk = (byte[]) queue.remove(0); 1818 notifyAll(); // to wake anything waiting until empty. 1819 return(removedChunk); 1820 } 1821 } 1822 1823 // class to implement thread which will copy and flush byte chunks 1824 private class CopierThread extends Thread 1825 { 1826 private boolean active = true; 1827 1828 public CopierThread() { 1829 super(); 1830 setDaemon(true); 1831 } 1832 1833 // wait until either terminate or wake is called by another thread 1834 synchronized void waitWoken() 1835 { 1836 try 1837 { 1838 this.wait(1000); // timeout of one second 1839 // NOTE a timeout is needed because the notify may happen while the 1840 // copier thread is not in the wait state. 1841 } 1842 catch(InterruptedException ie) 1843 {} 1844 } 1845 1846 synchronized void terminate() 1847 { 1848 active = false; 1849 notifyAll(); 1850 } 1851 1852 synchronized void wake() 1853 { 1854 notifyAll(); 1855 } 1856 1857 // Main method run when thread is started. 1858 public void run() 1859 { 1860 // Repeat this loop until terminated 1861 while(active) 1862 { 1863 // if the byteChunkQueue still has chunks, copy and flush them 1864 if(!byteChunkQueue.isEmpty()) 1865 { 1866 copyAndFlush(); 1867 } 1868 // if it is empty, wait until woken by terminate or wake 1869 else 1870 { 1871 this.waitWoken(); 1872 } 1873 } 1874 } 1875 } 1876 } 1877 1878 1879 1880} 1881