1/* 2 * Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26package com.sun.corba.se.impl.transport; 27 28import java.io.IOException; 29import java.net.InetSocketAddress; 30import java.net.Socket; 31import java.nio.ByteBuffer; 32import java.nio.channels.SelectableChannel; 33import java.nio.channels.SelectionKey; 34import java.nio.channels.SocketChannel; 35import java.security.AccessController; 36import java.security.PrivilegedAction; 37import java.util.Collections; 38import java.util.Hashtable; 39import java.util.HashMap; 40import java.util.Map; 41 42import org.omg.CORBA.COMM_FAILURE; 43import org.omg.CORBA.CompletionStatus; 44import org.omg.CORBA.DATA_CONVERSION; 45import org.omg.CORBA.INTERNAL; 46import org.omg.CORBA.MARSHAL; 47import org.omg.CORBA.OBJECT_NOT_EXIST; 48import org.omg.CORBA.SystemException; 49 50import com.sun.org.omg.SendingContext.CodeBase; 51 52import com.sun.corba.se.pept.broker.Broker; 53import com.sun.corba.se.pept.encoding.InputObject; 54import com.sun.corba.se.pept.encoding.OutputObject; 55import com.sun.corba.se.pept.protocol.MessageMediator; 56import com.sun.corba.se.pept.transport.Acceptor; 57import com.sun.corba.se.pept.transport.Connection; 58import com.sun.corba.se.pept.transport.ConnectionCache; 59import com.sun.corba.se.pept.transport.ContactInfo; 60import com.sun.corba.se.pept.transport.EventHandler; 61import com.sun.corba.se.pept.transport.InboundConnectionCache; 62import com.sun.corba.se.pept.transport.OutboundConnectionCache; 63import com.sun.corba.se.pept.transport.ResponseWaitingRoom; 64import com.sun.corba.se.pept.transport.Selector; 65 66import com.sun.corba.se.spi.ior.IOR; 67import com.sun.corba.se.spi.ior.iiop.GIOPVersion; 68import com.sun.corba.se.spi.logging.CORBALogDomains; 69import com.sun.corba.se.spi.orb.ORB ; 70import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException; 71import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException; 72import com.sun.corba.se.spi.orbutil.threadpool.Work; 73import com.sun.corba.se.spi.protocol.CorbaMessageMediator; 74import com.sun.corba.se.spi.transport.CorbaContactInfo; 75import com.sun.corba.se.spi.transport.CorbaConnection; 76import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom; 77import com.sun.corba.se.spi.transport.ReadTimeouts; 78 79import com.sun.corba.se.impl.encoding.CachedCodeBase; 80import com.sun.corba.se.impl.encoding.CDRInputStream_1_0; 81import com.sun.corba.se.impl.encoding.CDROutputObject; 82import com.sun.corba.se.impl.encoding.CDROutputStream_1_0; 83import com.sun.corba.se.impl.encoding.CodeSetComponentInfo; 84import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry; 85import com.sun.corba.se.impl.logging.ORBUtilSystemException; 86import com.sun.corba.se.impl.orbutil.ORBConstants; 87import com.sun.corba.se.impl.orbutil.ORBUtility; 88import com.sun.corba.se.impl.protocol.giopmsgheaders.Message; 89import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase; 90import com.sun.corba.se.impl.transport.CorbaResponseWaitingRoomImpl; 91 92/** 93 * @author Harold Carr 94 */ 95public class SocketOrChannelConnectionImpl 96 extends 97 EventHandlerBase 98 implements 99 CorbaConnection, 100 Work 101{ 102 public static boolean dprintWriteLocks = false; 103 104 // 105 // New transport. 106 // 107 108 protected long enqueueTime; 109 110 protected SocketChannel socketChannel; 111 public SocketChannel getSocketChannel() 112 { 113 return socketChannel; 114 } 115 116 // REVISIT: 117 // protected for test: genericRPCMSGFramework.IIOPConnection constructor. 118 protected CorbaContactInfo contactInfo; 119 protected Acceptor acceptor; 120 protected ConnectionCache connectionCache; 121 122 // 123 // From iiop.Connection.java 124 // 125 126 protected Socket socket; // The socket used for this connection. 127 protected long timeStamp = 0; 128 protected boolean isServer = false; 129 130 // Start at some value other than zero since this is a magic 131 // value in some protocols. 132 protected int requestId = 5; 133 protected CorbaResponseWaitingRoom responseWaitingRoom; 134 protected int state; 135 protected java.lang.Object stateEvent = new java.lang.Object(); 136 protected java.lang.Object writeEvent = new java.lang.Object(); 137 protected boolean writeLocked; 138 protected int serverRequestCount = 0; 139 140 // Server request map: used on the server side of Connection 141 // Maps request ID to IIOPInputStream. 142 Map serverRequestMap = null; 143 144 // This is a flag associated per connection telling us if the 145 // initial set of sending contexts were sent to the receiver 146 // already... 147 protected boolean postInitialContexts = false; 148 149 // Remote reference to CodeBase server (supplies 150 // FullValueDescription, among other things) 151 protected IOR codeBaseServerIOR; 152 153 // CodeBase cache for this connection. This will cache remote operations, 154 // handle connecting, and ensure we don't do any remote operations until 155 // necessary. 156 protected CachedCodeBase cachedCodeBase = new CachedCodeBase(this); 157 158 protected ORBUtilSystemException wrapper ; 159 160 // transport read timeout values 161 protected ReadTimeouts readTimeouts; 162 163 protected boolean shouldReadGiopHeaderOnly; 164 165 // A message mediator used when shouldReadGiopHeaderOnly is 166 // true to maintain request message state across execution in a 167 // SelectorThread and WorkerThread. 168 protected CorbaMessageMediator partialMessageMediator = null; 169 170 // Used in genericRPCMSGFramework test. 171 protected SocketOrChannelConnectionImpl(ORB orb) 172 { 173 this.orb = orb; 174 wrapper = ORBUtilSystemException.get( orb, 175 CORBALogDomains.RPC_TRANSPORT ) ; 176 177 setWork(this); 178 responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this); 179 setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts()); 180 } 181 182 // Both client and servers. 183 protected SocketOrChannelConnectionImpl(ORB orb, 184 boolean useSelectThreadToWait, 185 boolean useWorkerThread) 186 { 187 this(orb) ; 188 setUseSelectThreadToWait(useSelectThreadToWait); 189 setUseWorkerThreadForEvent(useWorkerThread); 190 } 191 192 // Client constructor. 193 public SocketOrChannelConnectionImpl(ORB orb, 194 CorbaContactInfo contactInfo, 195 boolean useSelectThreadToWait, 196 boolean useWorkerThread, 197 String socketType, 198 String hostname, 199 int port) 200 { 201 this(orb, useSelectThreadToWait, useWorkerThread); 202 203 this.contactInfo = contactInfo; 204 205 try { 206 socket = orb.getORBData().getSocketFactory() 207 .createSocket(socketType, 208 new InetSocketAddress(hostname, port)); 209 socketChannel = socket.getChannel(); 210 211 if (socketChannel != null) { 212 boolean isBlocking = !useSelectThreadToWait; 213 socketChannel.configureBlocking(isBlocking); 214 } else { 215 // IMPORTANT: non-channel-backed sockets must use 216 // dedicated reader threads. 217 setUseSelectThreadToWait(false); 218 } 219 if (orb.transportDebugFlag) { 220 dprint(".initialize: connection created: " + socket); 221 } 222 } catch (Throwable t) { 223 throw wrapper.connectFailure(t, socketType, hostname, 224 Integer.toString(port)); 225 } 226 state = OPENING; 227 } 228 229 // Client-side convenience. 230 public SocketOrChannelConnectionImpl(ORB orb, 231 CorbaContactInfo contactInfo, 232 String socketType, 233 String hostname, 234 int port) 235 { 236 this(orb, contactInfo, 237 orb.getORBData().connectionSocketUseSelectThreadToWait(), 238 orb.getORBData().connectionSocketUseWorkerThreadForEvent(), 239 socketType, hostname, port); 240 } 241 242 // Server-side constructor. 243 public SocketOrChannelConnectionImpl(ORB orb, 244 Acceptor acceptor, 245 Socket socket, 246 boolean useSelectThreadToWait, 247 boolean useWorkerThread) 248 { 249 this(orb, useSelectThreadToWait, useWorkerThread); 250 251 this.socket = socket; 252 socketChannel = socket.getChannel(); 253 if (socketChannel != null) { 254 // REVISIT 255 try { 256 boolean isBlocking = !useSelectThreadToWait; 257 socketChannel.configureBlocking(isBlocking); 258 } catch (IOException e) { 259 RuntimeException rte = new RuntimeException(); 260 rte.initCause(e); 261 throw rte; 262 } 263 } 264 this.acceptor = acceptor; 265 266 serverRequestMap = Collections.synchronizedMap(new HashMap()); 267 isServer = true; 268 269 state = ESTABLISHED; 270 } 271 272 // Server-side convenience 273 public SocketOrChannelConnectionImpl(ORB orb, 274 Acceptor acceptor, 275 Socket socket) 276 { 277 this(orb, acceptor, socket, 278 (socket.getChannel() == null 279 ? false 280 : orb.getORBData().connectionSocketUseSelectThreadToWait()), 281 (socket.getChannel() == null 282 ? false 283 : orb.getORBData().connectionSocketUseWorkerThreadForEvent())); 284 } 285 286 //////////////////////////////////////////////////// 287 // 288 // framework.transport.Connection 289 // 290 291 public boolean shouldRegisterReadEvent() 292 { 293 return true; 294 } 295 296 public boolean shouldRegisterServerReadEvent() 297 { 298 return true; 299 } 300 301 public boolean read() 302 { 303 try { 304 if (orb.transportDebugFlag) { 305 dprint(".read->: " + this); 306 } 307 CorbaMessageMediator messageMediator = readBits(); 308 if (messageMediator != null) { 309 // Null can happen when client closes stream 310 // causing purgecalls. 311 return dispatch(messageMediator); 312 } 313 return true; 314 } finally { 315 if (orb.transportDebugFlag) { 316 dprint(".read<-: " + this); 317 } 318 } 319 } 320 321 protected CorbaMessageMediator readBits() 322 { 323 try { 324 325 if (orb.transportDebugFlag) { 326 dprint(".readBits->: " + this); 327 } 328 329 MessageMediator messageMediator; 330 // REVISIT - use common factory base class. 331 if (contactInfo != null) { 332 messageMediator = 333 contactInfo.createMessageMediator(orb, this); 334 } else if (acceptor != null) { 335 messageMediator = acceptor.createMessageMediator(orb, this); 336 } else { 337 throw 338 new RuntimeException("SocketOrChannelConnectionImpl.readBits"); 339 } 340 return (CorbaMessageMediator) messageMediator; 341 342 } catch (ThreadDeath td) { 343 if (orb.transportDebugFlag) { 344 dprint(".readBits: " + this + ": ThreadDeath: " + td, td); 345 } 346 try { 347 purgeCalls(wrapper.connectionAbort(td), false, false); 348 } catch (Throwable t) { 349 if (orb.transportDebugFlag) { 350 dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t); 351 } 352 } 353 throw td; 354 } catch (Throwable ex) { 355 if (orb.transportDebugFlag) { 356 dprint(".readBits: " + this + ": Throwable: " + ex, ex); 357 } 358 359 try { 360 if (ex instanceof INTERNAL) { 361 sendMessageError(GIOPVersion.DEFAULT_VERSION); 362 } 363 } catch (IOException e) { 364 if (orb.transportDebugFlag) { 365 dprint(".readBits: " + this + 366 ": sendMessageError: IOException: " + e, e); 367 } 368 } 369 // REVISIT - make sure reader thread is killed. 370 Selector selector = orb.getTransportManager().getSelector(0); 371 if (selector != null) { 372 selector.unregisterForEvent(this); 373 } 374 // Notify anyone waiting. 375 purgeCalls(wrapper.connectionAbort(ex), true, false); 376 // REVISIT 377 //keepRunning = false; 378 // REVISIT - if this is called after purgeCalls then 379 // the state of the socket is ABORT so the writeLock 380 // in close throws an exception. It is ignored but 381 // causes IBM (screen scraping) tests to fail. 382 //close(); 383 } finally { 384 if (orb.transportDebugFlag) { 385 dprint(".readBits<-: " + this); 386 } 387 } 388 return null; 389 } 390 391 protected CorbaMessageMediator finishReadingBits(MessageMediator messageMediator) 392 { 393 try { 394 395 if (orb.transportDebugFlag) { 396 dprint(".finishReadingBits->: " + this); 397 } 398 399 // REVISIT - use common factory base class. 400 if (contactInfo != null) { 401 messageMediator = 402 contactInfo.finishCreatingMessageMediator(orb, this, messageMediator); 403 } else if (acceptor != null) { 404 messageMediator = 405 acceptor.finishCreatingMessageMediator(orb, this, messageMediator); 406 } else { 407 throw 408 new RuntimeException("SocketOrChannelConnectionImpl.finishReadingBits"); 409 } 410 return (CorbaMessageMediator) messageMediator; 411 412 } catch (ThreadDeath td) { 413 if (orb.transportDebugFlag) { 414 dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td); 415 } 416 try { 417 purgeCalls(wrapper.connectionAbort(td), false, false); 418 } catch (Throwable t) { 419 if (orb.transportDebugFlag) { 420 dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t); 421 } 422 } 423 throw td; 424 } catch (Throwable ex) { 425 if (orb.transportDebugFlag) { 426 dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex); 427 } 428 429 try { 430 if (ex instanceof INTERNAL) { 431 sendMessageError(GIOPVersion.DEFAULT_VERSION); 432 } 433 } catch (IOException e) { 434 if (orb.transportDebugFlag) { 435 dprint(".finishReadingBits: " + this + 436 ": sendMessageError: IOException: " + e, e); 437 } 438 } 439 // REVISIT - make sure reader thread is killed. 440 orb.getTransportManager().getSelector(0).unregisterForEvent(this); 441 // Notify anyone waiting. 442 purgeCalls(wrapper.connectionAbort(ex), true, false); 443 // REVISIT 444 //keepRunning = false; 445 // REVISIT - if this is called after purgeCalls then 446 // the state of the socket is ABORT so the writeLock 447 // in close throws an exception. It is ignored but 448 // causes IBM (screen scraping) tests to fail. 449 //close(); 450 } finally { 451 if (orb.transportDebugFlag) { 452 dprint(".finishReadingBits<-: " + this); 453 } 454 } 455 return null; 456 } 457 458 protected boolean dispatch(CorbaMessageMediator messageMediator) 459 { 460 try { 461 if (orb.transportDebugFlag) { 462 dprint(".dispatch->: " + this); 463 } 464 465 // 466 // NOTE: 467 // 468 // This call is the transition from the tranport block 469 // to the protocol block. 470 // 471 472 boolean result = 473 messageMediator.getProtocolHandler() 474 .handleRequest(messageMediator); 475 476 return result; 477 478 } catch (ThreadDeath td) { 479 if (orb.transportDebugFlag) { 480 dprint(".dispatch: ThreadDeath", td ); 481 } 482 try { 483 purgeCalls(wrapper.connectionAbort(td), false, false); 484 } catch (Throwable t) { 485 if (orb.transportDebugFlag) { 486 dprint(".dispatch: purgeCalls: Throwable", t); 487 } 488 } 489 throw td; 490 } catch (Throwable ex) { 491 if (orb.transportDebugFlag) { 492 dprint(".dispatch: Throwable", ex ) ; 493 } 494 495 try { 496 if (ex instanceof INTERNAL) { 497 sendMessageError(GIOPVersion.DEFAULT_VERSION); 498 } 499 } catch (IOException e) { 500 if (orb.transportDebugFlag) { 501 dprint(".dispatch: sendMessageError: IOException", e); 502 } 503 } 504 purgeCalls(wrapper.connectionAbort(ex), false, false); 505 // REVISIT 506 //keepRunning = false; 507 } finally { 508 if (orb.transportDebugFlag) { 509 dprint(".dispatch<-: " + this); 510 } 511 } 512 513 return true; 514 } 515 516 public boolean shouldUseDirectByteBuffers() 517 { 518 return getSocketChannel() != null; 519 } 520 521 public ByteBuffer read(int size, int offset, int length, long max_wait_time) 522 throws IOException 523 { 524 if (shouldUseDirectByteBuffers()) { 525 526 ByteBuffer byteBuffer = 527 orb.getByteBufferPool().getByteBuffer(size); 528 529 if (orb.transportDebugFlag) { 530 // print address of ByteBuffer gotten from pool 531 int bbAddress = System.identityHashCode(byteBuffer); 532 StringBuffer sb = new StringBuffer(80); 533 sb.append(".read: got ByteBuffer id ("); 534 sb.append(bbAddress).append(") from ByteBufferPool."); 535 String msgStr = sb.toString(); 536 dprint(msgStr); 537 } 538 539 byteBuffer.position(offset); 540 byteBuffer.limit(size); 541 542 readFully(byteBuffer, length, max_wait_time); 543 544 return byteBuffer; 545 } 546 547 byte[] buf = new byte[size]; 548 readFully(getSocket().getInputStream(), buf, 549 offset, length, max_wait_time); 550 ByteBuffer byteBuffer = ByteBuffer.wrap(buf); 551 byteBuffer.limit(size); 552 return byteBuffer; 553 } 554 555 public ByteBuffer read(ByteBuffer byteBuffer, int offset, 556 int length, long max_wait_time) 557 throws IOException 558 { 559 int size = offset + length; 560 if (shouldUseDirectByteBuffers()) { 561 562 if (! byteBuffer.isDirect()) { 563 throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket(); 564 } 565 if (size > byteBuffer.capacity()) { 566 if (orb.transportDebugFlag) { 567 // print address of ByteBuffer being released 568 int bbAddress = System.identityHashCode(byteBuffer); 569 StringBuffer bbsb = new StringBuffer(80); 570 bbsb.append(".read: releasing ByteBuffer id (") 571 .append(bbAddress).append(") to ByteBufferPool."); 572 String bbmsg = bbsb.toString(); 573 dprint(bbmsg); 574 } 575 orb.getByteBufferPool().releaseByteBuffer(byteBuffer); 576 byteBuffer = orb.getByteBufferPool().getByteBuffer(size); 577 } 578 byteBuffer.position(offset); 579 byteBuffer.limit(size); 580 readFully(byteBuffer, length, max_wait_time); 581 byteBuffer.position(0); 582 byteBuffer.limit(size); 583 return byteBuffer; 584 } 585 if (byteBuffer.isDirect()) { 586 throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket(); 587 } 588 byte[] buf = new byte[size]; 589 readFully(getSocket().getInputStream(), buf, 590 offset, length, max_wait_time); 591 return ByteBuffer.wrap(buf); 592 } 593 594 public void readFully(ByteBuffer byteBuffer, int size, long max_wait_time) 595 throws IOException 596 { 597 int n = 0; 598 int bytecount = 0; 599 long time_to_wait = readTimeouts.get_initial_time_to_wait(); 600 long total_time_in_wait = 0; 601 602 // The reading of data incorporates a strategy to detect a 603 // rogue client. The strategy is implemented as follows. As 604 // long as data is being read, at least 1 byte or more, we 605 // assume we have a well behaved client. If no data is read, 606 // then we sleep for a time to wait, re-calculate a new time to 607 // wait which is lengthier than the previous time spent waiting. 608 // Then, if the total time spent waiting does not exceed a 609 // maximum time we are willing to wait, we attempt another 610 // read. If the maximum amount of time we are willing to 611 // spend waiting for more data is exceeded, we throw an 612 // IOException. 613 614 // NOTE: Reading of GIOP headers are treated with a smaller 615 // maximum time to wait threshold. Based on extensive 616 // performance testing, all GIOP headers are being 617 // read in 1 read access. 618 619 do { 620 bytecount = getSocketChannel().read(byteBuffer); 621 622 if (bytecount < 0) { 623 throw new IOException("End-of-stream"); 624 } 625 else if (bytecount == 0) { 626 try { 627 Thread.sleep(time_to_wait); 628 total_time_in_wait += time_to_wait; 629 time_to_wait = 630 (long)(time_to_wait*readTimeouts.get_backoff_factor()); 631 } 632 catch (InterruptedException ie) { 633 // ignore exception 634 if (orb.transportDebugFlag) { 635 dprint("readFully(): unexpected exception " 636 + ie.toString()); 637 } 638 } 639 } 640 else { 641 n += bytecount; 642 } 643 } 644 while (n < size && total_time_in_wait < max_wait_time); 645 646 if (n < size && total_time_in_wait >= max_wait_time) 647 { 648 // failed to read entire message 649 throw wrapper.transportReadTimeoutExceeded(new Integer(size), 650 new Integer(n), new Long(max_wait_time), 651 new Long(total_time_in_wait)); 652 } 653 654 getConnectionCache().stampTime(this); 655 } 656 657 // To support non-channel connections. 658 public void readFully(java.io.InputStream is, byte[] buf, 659 int offset, int size, long max_wait_time) 660 throws IOException 661 { 662 int n = 0; 663 int bytecount = 0; 664 long time_to_wait = readTimeouts.get_initial_time_to_wait(); 665 long total_time_in_wait = 0; 666 667 // The reading of data incorporates a strategy to detect a 668 // rogue client. The strategy is implemented as follows. As 669 // long as data is being read, at least 1 byte or more, we 670 // assume we have a well behaved client. If no data is read, 671 // then we sleep for a time to wait, re-calculate a new time to 672 // wait which is lengthier than the previous time spent waiting. 673 // Then, if the total time spent waiting does not exceed a 674 // maximum time we are willing to wait, we attempt another 675 // read. If the maximum amount of time we are willing to 676 // spend waiting for more data is exceeded, we throw an 677 // IOException. 678 679 // NOTE: Reading of GIOP headers are treated with a smaller 680 // maximum time to wait threshold. Based on extensive 681 // performance testing, all GIOP headers are being 682 // read in 1 read access. 683 684 do { 685 bytecount = is.read(buf, offset + n, size - n); 686 if (bytecount < 0) { 687 throw new IOException("End-of-stream"); 688 } 689 else if (bytecount == 0) { 690 try { 691 Thread.sleep(time_to_wait); 692 total_time_in_wait += time_to_wait; 693 time_to_wait = 694 (long)(time_to_wait*readTimeouts.get_backoff_factor()); 695 } 696 catch (InterruptedException ie) { 697 // ignore exception 698 if (orb.transportDebugFlag) { 699 dprint("readFully(): unexpected exception " 700 + ie.toString()); 701 } 702 } 703 } 704 else { 705 n += bytecount; 706 } 707 } 708 while (n < size && total_time_in_wait < max_wait_time); 709 710 if (n < size && total_time_in_wait >= max_wait_time) 711 { 712 // failed to read entire message 713 throw wrapper.transportReadTimeoutExceeded(new Integer(size), 714 new Integer(n), new Long(max_wait_time), 715 new Long(total_time_in_wait)); 716 } 717 718 getConnectionCache().stampTime(this); 719 } 720 721 public void write(ByteBuffer byteBuffer) 722 throws IOException 723 { 724 if (shouldUseDirectByteBuffers()) { 725 /* NOTE: cannot perform this test. If one ask for a 726 ByteBuffer from the pool which is bigger than the size 727 of ByteBuffers managed by the pool, then the pool will 728 return a HeapByteBuffer. 729 if (byteBuffer.hasArray()) { 730 throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket(); 731 } 732 */ 733 // IMPORTANT: For non-blocking SocketChannels, there's no guarantee 734 // all bytes are written on first write attempt. 735 do { 736 getSocketChannel().write(byteBuffer); 737 } 738 while (byteBuffer.hasRemaining()); 739 740 } else { 741 if (! byteBuffer.hasArray()) { 742 throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket(); 743 } 744 byte[] tmpBuf = byteBuffer.array(); 745 getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit()); 746 getSocket().getOutputStream().flush(); 747 } 748 749 // TimeStamp connection to indicate it has been used 750 // Note granularity of connection usage is assumed for 751 // now to be that of a IIOP packet. 752 getConnectionCache().stampTime(this); 753 } 754 755 /** 756 * Note:it is possible for this to be called more than once 757 */ 758 public synchronized void close() 759 { 760 try { 761 if (orb.transportDebugFlag) { 762 dprint(".close->: " + this); 763 } 764 writeLock(); 765 766 // REVISIT It will be good to have a read lock on the reader thread 767 // before we proceed further, to avoid the reader thread (server side) 768 // from processing requests. This avoids the risk that a new request 769 // will be accepted by ReaderThread while the ListenerThread is 770 // attempting to close this connection. 771 772 if (isBusy()) { // we are busy! 773 writeUnlock(); 774 if (orb.transportDebugFlag) { 775 dprint(".close: isBusy so no close: " + this); 776 } 777 return; 778 } 779 780 try { 781 try { 782 sendCloseConnection(GIOPVersion.V1_0); 783 } catch (Throwable t) { 784 wrapper.exceptionWhenSendingCloseConnection(t); 785 } 786 787 synchronized ( stateEvent ){ 788 state = CLOSE_SENT; 789 stateEvent.notifyAll(); 790 } 791 792 // stop the reader without causing it to do purgeCalls 793 //Exception ex = new Exception(); 794 //reader.stop(ex); // REVISIT 795 796 // NOTE: !!!!!! 797 // This does writeUnlock(). 798 purgeCalls(wrapper.connectionRebind(), false, true); 799 800 } catch (Exception ex) { 801 if (orb.transportDebugFlag) { 802 dprint(".close: exception: " + this, ex); 803 } 804 } 805 try { 806 Selector selector = orb.getTransportManager().getSelector(0); 807 if (selector != null) { 808 selector.unregisterForEvent(this); 809 } 810 if (socketChannel != null) { 811 socketChannel.close(); 812 } 813 socket.close(); 814 } catch (IOException e) { 815 if (orb.transportDebugFlag) { 816 dprint(".close: " + this, e); 817 } 818 } 819 closeConnectionResources(); 820 } finally { 821 if (orb.transportDebugFlag) { 822 dprint(".close<-: " + this); 823 } 824 } 825 } 826 827 public void closeConnectionResources() { 828 if (orb.transportDebugFlag) { 829 dprint(".closeConnectionResources->: " + this); 830 } 831 Selector selector = orb.getTransportManager().getSelector(0); 832 if (selector != null) { 833 selector.unregisterForEvent(this); 834 } 835 try { 836 if (socketChannel != null) 837 socketChannel.close() ; 838 if (socket != null && !socket.isClosed()) 839 socket.close() ; 840 } catch (IOException e) { 841 if (orb.transportDebugFlag) { 842 dprint( ".closeConnectionResources: " + this, e ) ; 843 } 844 } 845 if (orb.transportDebugFlag) { 846 dprint(".closeConnectionResources<-: " + this); 847 } 848 } 849 850 851 public Acceptor getAcceptor() 852 { 853 return acceptor; 854 } 855 856 public ContactInfo getContactInfo() 857 { 858 return contactInfo; 859 } 860 861 public EventHandler getEventHandler() 862 { 863 return this; 864 } 865 866 public OutputObject createOutputObject(MessageMediator messageMediator) 867 { 868 // REVISIT - remove this method from Connection and all it subclasses. 869 throw new RuntimeException("*****SocketOrChannelConnectionImpl.createOutputObject - should not be called."); 870 } 871 872 // This is used by the GIOPOutputObject in order to 873 // throw the correct error when handling code sets. 874 // Can we determine if we are on the server side by 875 // other means? XREVISIT 876 public boolean isServer() 877 { 878 return isServer; 879 } 880 881 public boolean isBusy() 882 { 883 if (serverRequestCount > 0 || 884 getResponseWaitingRoom().numberRegistered() > 0) 885 { 886 return true; 887 } else { 888 return false; 889 } 890 } 891 892 public long getTimeStamp() 893 { 894 return timeStamp; 895 } 896 897 public void setTimeStamp(long time) 898 { 899 timeStamp = time; 900 } 901 902 public void setState(String stateString) 903 { 904 synchronized (stateEvent) { 905 if (stateString.equals("ESTABLISHED")) { 906 state = ESTABLISHED; 907 stateEvent.notifyAll(); 908 } else { 909 // REVISIT: ASSERT 910 } 911 } 912 } 913 914 /** 915 * Sets the writeLock for this connection. 916 * If the writeLock is already set by someone else, block till the 917 * writeLock is released and can set by us. 918 * IMPORTANT: this connection's lock must be acquired before 919 * setting the writeLock and must be unlocked after setting the writeLock. 920 */ 921 public void writeLock() 922 { 923 try { 924 if (dprintWriteLocks && orb.transportDebugFlag) { 925 dprint(".writeLock->: " + this); 926 } 927 // Keep looping till we can set the writeLock. 928 while ( true ) { 929 int localState = state; 930 switch ( localState ) { 931 932 case OPENING: 933 synchronized (stateEvent) { 934 if (state != OPENING) { 935 // somebody has changed 'state' so be careful 936 break; 937 } 938 try { 939 stateEvent.wait(); 940 } catch (InterruptedException ie) { 941 if (orb.transportDebugFlag) { 942 dprint(".writeLock: OPENING InterruptedException: " + this); 943 } 944 } 945 } 946 // Loop back 947 break; 948 949 case ESTABLISHED: 950 synchronized (writeEvent) { 951 if (!writeLocked) { 952 writeLocked = true; 953 return; 954 } 955 956 try { 957 // do not stay here too long if state != ESTABLISHED 958 // Bug 4752117 959 while (state == ESTABLISHED && writeLocked) { 960 writeEvent.wait(100); 961 } 962 } catch (InterruptedException ie) { 963 if (orb.transportDebugFlag) { 964 dprint(".writeLock: ESTABLISHED InterruptedException: " + this); 965 } 966 } 967 } 968 // Loop back 969 break; 970 971 // 972 // XXX 973 // Need to distinguish between client and server roles 974 // here probably. 975 // 976 case ABORT: 977 synchronized ( stateEvent ){ 978 if (state != ABORT) { 979 break; 980 } 981 throw wrapper.writeErrorSend() ; 982 } 983 984 case CLOSE_RECVD: 985 // the connection has been closed or closing 986 // ==> throw rebind exception 987 synchronized ( stateEvent ){ 988 if (state != CLOSE_RECVD) { 989 break; 990 } 991 throw wrapper.connectionCloseRebind() ; 992 } 993 994 default: 995 if (orb.transportDebugFlag) { 996 dprint(".writeLock: default: " + this); 997 } 998 // REVISIT 999 throw new RuntimeException(".writeLock: bad state"); 1000 } 1001 } 1002 } finally { 1003 if (dprintWriteLocks && orb.transportDebugFlag) { 1004 dprint(".writeLock<-: " + this); 1005 } 1006 } 1007 } 1008 1009 public void writeUnlock() 1010 { 1011 try { 1012 if (dprintWriteLocks && orb.transportDebugFlag) { 1013 dprint(".writeUnlock->: " + this); 1014 } 1015 synchronized (writeEvent) { 1016 writeLocked = false; 1017 writeEvent.notify(); // wake up one guy waiting to write 1018 } 1019 } finally { 1020 if (dprintWriteLocks && orb.transportDebugFlag) { 1021 dprint(".writeUnlock<-: " + this); 1022 } 1023 } 1024 } 1025 1026 // Assumes the caller handles writeLock and writeUnlock 1027 public void sendWithoutLock(OutputObject outputObject) 1028 { 1029 // Don't we need to check for CloseConnection 1030 // here? REVISIT 1031 1032 // XREVISIT - Shouldn't the MessageMediator 1033 // be the one to handle writing the data here? 1034 1035 try { 1036 1037 // Write the fragment/message 1038 1039 CDROutputObject cdrOutputObject = (CDROutputObject) outputObject; 1040 cdrOutputObject.writeTo(this); 1041 // REVISIT - no flush? 1042 //socket.getOutputStream().flush(); 1043 1044 } catch (IOException e1) { 1045 1046 /* 1047 * ADDED(Ram J) 10/13/2000 In the event of an IOException, try 1048 * sending a CancelRequest for regular requests / locate requests 1049 */ 1050 1051 // Since IIOPOutputStream's msgheader is set only once, and not 1052 // altered during sending multiple fragments, the original 1053 // msgheader will always have the requestId. 1054 // REVISIT This could be optimized to send a CancelRequest only 1055 // if any fragments had been sent already. 1056 1057 /* REVISIT: MOVE TO SUBCONTRACT 1058 Message msg = os.getMessage(); 1059 if (msg.getType() == Message.GIOPRequest || 1060 msg.getType() == Message.GIOPLocateRequest) { 1061 GIOPVersion requestVersion = msg.getGIOPVersion(); 1062 int requestId = MessageBase.getRequestId(msg); 1063 try { 1064 sendCancelRequest(requestVersion, requestId); 1065 } catch (IOException e2) { 1066 // most likely an abortive connection closure. 1067 // ignore, since nothing more can be done. 1068 if (orb.transportDebugFlag) { 1069 1070 } 1071 } 1072 */ 1073 1074 // REVISIT When a send failure happens, purgeCalls() need to be 1075 // called to ensure that the connection is properly removed from 1076 // further usage (ie., cancelling pending requests with COMM_FAILURE 1077 // with an appropriate minor_code CompletionStatus.MAY_BE). 1078 1079 // Relying on the IIOPOutputStream (as noted below) is not 1080 // sufficient as it handles COMM_FAILURE only for the final 1081 // fragment (during invoke processing). Note that COMM_FAILURE could 1082 // happen while sending the initial fragments. 1083 // Also the IIOPOutputStream does not properly close the connection. 1084 // It simply removes the connection from the table. An orderly 1085 // closure is needed (ie., cancel pending requests on the connection 1086 // COMM_FAILURE as well. 1087 1088 // IIOPOutputStream will cleanup the connection info when it 1089 // sees this exception. 1090 SystemException exc = wrapper.writeErrorSend(e1); 1091 purgeCalls(exc, false, true); 1092 throw exc; 1093 } 1094 } 1095 1096 public void registerWaiter(MessageMediator messageMediator) 1097 { 1098 responseWaitingRoom.registerWaiter(messageMediator); 1099 } 1100 1101 public void unregisterWaiter(MessageMediator messageMediator) 1102 { 1103 responseWaitingRoom.unregisterWaiter(messageMediator); 1104 } 1105 1106 public InputObject waitForResponse(MessageMediator messageMediator) 1107 { 1108 return responseWaitingRoom.waitForResponse(messageMediator); 1109 } 1110 1111 public void setConnectionCache(ConnectionCache connectionCache) 1112 { 1113 this.connectionCache = connectionCache; 1114 } 1115 1116 public ConnectionCache getConnectionCache() 1117 { 1118 return connectionCache; 1119 } 1120 1121 //////////////////////////////////////////////////// 1122 // 1123 // EventHandler methods 1124 // 1125 1126 public void setUseSelectThreadToWait(boolean x) 1127 { 1128 useSelectThreadToWait = x; 1129 // REVISIT - Reading of a GIOP header only is information 1130 // that should be passed into the constructor 1131 // from the SocketOrChannelConnection factory. 1132 setReadGiopHeaderOnly(shouldUseSelectThreadToWait()); 1133 } 1134 1135 public void handleEvent() 1136 { 1137 if (orb.transportDebugFlag) { 1138 dprint(".handleEvent->: " + this); 1139 } 1140 getSelectionKey().interestOps(getSelectionKey().interestOps() & 1141 (~ getInterestOps())); 1142 1143 if (shouldUseWorkerThreadForEvent()) { 1144 Throwable throwable = null; 1145 try { 1146 int poolToUse = 0; 1147 if (shouldReadGiopHeaderOnly()) { 1148 partialMessageMediator = readBits(); 1149 poolToUse = 1150 partialMessageMediator.getThreadPoolToUse(); 1151 } 1152 1153 if (orb.transportDebugFlag) { 1154 dprint(".handleEvent: addWork to pool: " + poolToUse); 1155 } 1156 orb.getThreadPoolManager().getThreadPool(poolToUse) 1157 .getWorkQueue(0).addWork(getWork()); 1158 } catch (NoSuchThreadPoolException e) { 1159 throwable = e; 1160 } catch (NoSuchWorkQueueException e) { 1161 throwable = e; 1162 } 1163 // REVISIT: need to close connection. 1164 if (throwable != null) { 1165 if (orb.transportDebugFlag) { 1166 dprint(".handleEvent: " + throwable); 1167 } 1168 INTERNAL i = new INTERNAL("NoSuchThreadPoolException"); 1169 i.initCause(throwable); 1170 throw i; 1171 } 1172 } else { 1173 if (orb.transportDebugFlag) { 1174 dprint(".handleEvent: doWork"); 1175 } 1176 getWork().doWork(); 1177 } 1178 if (orb.transportDebugFlag) { 1179 dprint(".handleEvent<-: " + this); 1180 } 1181 } 1182 1183 public SelectableChannel getChannel() 1184 { 1185 return socketChannel; 1186 } 1187 1188 public int getInterestOps() 1189 { 1190 return SelectionKey.OP_READ; 1191 } 1192 1193 // public Acceptor getAcceptor() - already defined above. 1194 1195 public Connection getConnection() 1196 { 1197 return this; 1198 } 1199 1200 //////////////////////////////////////////////////// 1201 // 1202 // Work methods. 1203 // 1204 1205 public String getName() 1206 { 1207 return this.toString(); 1208 } 1209 1210 public void doWork() 1211 { 1212 try { 1213 if (orb.transportDebugFlag) { 1214 dprint(".doWork->: " + this); 1215 } 1216 1217 // IMPORTANT: Sanity checks on SelectionKeys such as 1218 // SelectorKey.isValid() should not be done 1219 // here. 1220 // 1221 1222 if (!shouldReadGiopHeaderOnly()) { 1223 read(); 1224 } 1225 else { 1226 // get the partialMessageMediator 1227 // created by SelectorThread 1228 CorbaMessageMediator messageMediator = 1229 this.getPartialMessageMediator(); 1230 1231 // read remaining info needed in a MessageMediator 1232 messageMediator = finishReadingBits(messageMediator); 1233 1234 if (messageMediator != null) { 1235 // Null can happen when client closes stream 1236 // causing purgecalls. 1237 dispatch(messageMediator); 1238 } 1239 } 1240 } catch (Throwable t) { 1241 if (orb.transportDebugFlag) { 1242 dprint(".doWork: ignoring Throwable: " 1243 + t 1244 + " " + this); 1245 } 1246 } finally { 1247 if (orb.transportDebugFlag) { 1248 dprint(".doWork<-: " + this); 1249 } 1250 } 1251 } 1252 1253 public void setEnqueueTime(long timeInMillis) 1254 { 1255 enqueueTime = timeInMillis; 1256 } 1257 1258 public long getEnqueueTime() 1259 { 1260 return enqueueTime; 1261 } 1262 1263 //////////////////////////////////////////////////// 1264 // 1265 // spi.transport.CorbaConnection. 1266 // 1267 1268 // IMPORTANT: Reader Threads must NOT read Giop header only. 1269 public boolean shouldReadGiopHeaderOnly() { 1270 return shouldReadGiopHeaderOnly; 1271 } 1272 1273 protected void setReadGiopHeaderOnly(boolean shouldReadHeaderOnly) { 1274 shouldReadGiopHeaderOnly = shouldReadHeaderOnly; 1275 } 1276 1277 public ResponseWaitingRoom getResponseWaitingRoom() 1278 { 1279 return responseWaitingRoom; 1280 } 1281 1282 // REVISIT - inteface defines isServer but already defined in 1283 // higher interface. 1284 1285 public void serverRequestMapPut(int requestId, 1286 CorbaMessageMediator messageMediator) 1287 { 1288 serverRequestMap.put(new Integer(requestId), messageMediator); 1289 } 1290 1291 public CorbaMessageMediator serverRequestMapGet(int requestId) 1292 { 1293 return (CorbaMessageMediator) 1294 serverRequestMap.get(new Integer(requestId)); 1295 } 1296 1297 public void serverRequestMapRemove(int requestId) 1298 { 1299 serverRequestMap.remove(new Integer(requestId)); 1300 } 1301 1302 1303 // REVISIT: this is also defined in: 1304 // com.sun.corba.se.spi.legacy.connection.Connection 1305 public java.net.Socket getSocket() 1306 { 1307 return socket; 1308 } 1309 1310 /** It is possible for a Close Connection to have been 1311 ** sent here, but we will not check for this. A "lazy" 1312 ** Exception will be thrown in the Worker thread after the 1313 ** incoming request has been processed even though the connection 1314 ** is closed before the request is processed. This is o.k because 1315 ** it is a boundary condition. To prevent it we would have to add 1316 ** more locks which would reduce performance in the normal case. 1317 **/ 1318 public synchronized void serverRequestProcessingBegins() 1319 { 1320 serverRequestCount++; 1321 } 1322 1323 public synchronized void serverRequestProcessingEnds() 1324 { 1325 serverRequestCount--; 1326 } 1327 1328 // 1329 // 1330 // 1331 1332 public synchronized int getNextRequestId() 1333 { 1334 return requestId++; 1335 } 1336 1337 // Negotiated code sets for char and wchar data 1338 protected CodeSetComponentInfo.CodeSetContext codeSetContext = null; 1339 1340 public ORB getBroker() 1341 { 1342 return orb; 1343 } 1344 1345 public CodeSetComponentInfo.CodeSetContext getCodeSetContext() { 1346 // Needs to be synchronized for the following case when the client 1347 // doesn't send the code set context twice, and we have two threads 1348 // in ServerRequestDispatcher processCodeSetContext. 1349 // 1350 // Thread A checks to see if there is a context, there is none, so 1351 // it calls setCodeSetContext, getting the synch lock. 1352 // Thread B checks to see if there is a context. If we didn't synch, 1353 // it might decide to outlaw wchar/wstring. 1354 if (codeSetContext == null) { 1355 synchronized(this) { 1356 return codeSetContext; 1357 } 1358 } 1359 1360 return codeSetContext; 1361 } 1362 1363 public synchronized void setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc) { 1364 // Double check whether or not we need to do this 1365 if (codeSetContext == null) { 1366 1367 if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null || 1368 OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) { 1369 // If the client says it's negotiated a code set that 1370 // isn't a fallback and we never said we support, then 1371 // it has a bug. 1372 throw wrapper.badCodesetsFromClient() ; 1373 } 1374 1375 codeSetContext = csc; 1376 } 1377 } 1378 1379 // 1380 // from iiop.IIOPConnection.java 1381 // 1382 1383 // Map request ID to an InputObject. 1384 // This is so the client thread can start unmarshaling 1385 // the reply and remove it from the out_calls map while the 1386 // ReaderThread can still obtain the input stream to give 1387 // new fragments. Only the ReaderThread touches the clientReplyMap, 1388 // so it doesn't incur synchronization overhead. 1389 1390 public MessageMediator clientRequestMapGet(int requestId) 1391 { 1392 return responseWaitingRoom.getMessageMediator(requestId); 1393 } 1394 1395 protected MessageMediator clientReply_1_1; 1396 1397 public void clientReply_1_1_Put(MessageMediator x) 1398 { 1399 clientReply_1_1 = x; 1400 } 1401 1402 public MessageMediator clientReply_1_1_Get() 1403 { 1404 return clientReply_1_1; 1405 } 1406 1407 public void clientReply_1_1_Remove() 1408 { 1409 clientReply_1_1 = null; 1410 } 1411 1412 protected MessageMediator serverRequest_1_1; 1413 1414 public void serverRequest_1_1_Put(MessageMediator x) 1415 { 1416 serverRequest_1_1 = x; 1417 } 1418 1419 public MessageMediator serverRequest_1_1_Get() 1420 { 1421 return serverRequest_1_1; 1422 } 1423 1424 public void serverRequest_1_1_Remove() 1425 { 1426 serverRequest_1_1 = null; 1427 } 1428 1429 protected String getStateString( int state ) 1430 { 1431 synchronized ( stateEvent ){ 1432 switch (state) { 1433 case OPENING : return "OPENING" ; 1434 case ESTABLISHED : return "ESTABLISHED" ; 1435 case CLOSE_SENT : return "CLOSE_SENT" ; 1436 case CLOSE_RECVD : return "CLOSE_RECVD" ; 1437 case ABORT : return "ABORT" ; 1438 default : return "???" ; 1439 } 1440 } 1441 } 1442 1443 public synchronized boolean isPostInitialContexts() { 1444 return postInitialContexts; 1445 } 1446 1447 // Can never be unset... 1448 public synchronized void setPostInitialContexts(){ 1449 postInitialContexts = true; 1450 } 1451 1452 /** 1453 * Wake up the outstanding requests on the connection, and hand them 1454 * COMM_FAILURE exception with a given minor code. 1455 * 1456 * Also, delete connection from connection table and 1457 * stop the reader thread. 1458 1459 * Note that this should only ever be called by the Reader thread for 1460 * this connection. 1461 * 1462 * @param minor_code The minor code for the COMM_FAILURE major code. 1463 * @param die Kill the reader thread (this thread) before exiting. 1464 */ 1465 public void purgeCalls(SystemException systemException, 1466 boolean die, boolean lockHeld) 1467 { 1468 int minor_code = systemException.minor; 1469 1470 try{ 1471 if (orb.transportDebugFlag) { 1472 dprint(".purgeCalls->: " 1473 + minor_code + "/" + die + "/" + lockHeld 1474 + " " + this); 1475 } 1476 1477 // If this invocation is a result of ThreadDeath caused 1478 // by a previous execution of this routine, just exit. 1479 1480 synchronized ( stateEvent ){ 1481 if ((state == ABORT) || (state == CLOSE_RECVD)) { 1482 if (orb.transportDebugFlag) { 1483 dprint(".purgeCalls: exiting since state is: " 1484 + getStateString(state) 1485 + " " + this); 1486 } 1487 return; 1488 } 1489 } 1490 1491 // Grab the writeLock (freeze the calls) 1492 try { 1493 if (!lockHeld) { 1494 writeLock(); 1495 } 1496 } catch (SystemException ex) { 1497 if (orb.transportDebugFlag) 1498 dprint(".purgeCalls: SystemException" + ex 1499 + "; continuing " + this); 1500 } 1501 1502 // Mark the state of the connection 1503 // and determine the request status 1504 org.omg.CORBA.CompletionStatus completion_status; 1505 synchronized ( stateEvent ){ 1506 if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) { 1507 state = CLOSE_RECVD; 1508 systemException.completed = CompletionStatus.COMPLETED_NO; 1509 } else { 1510 state = ABORT; 1511 systemException.completed = CompletionStatus.COMPLETED_MAYBE; 1512 } 1513 stateEvent.notifyAll(); 1514 } 1515 1516 try { 1517 socket.getInputStream().close(); 1518 socket.getOutputStream().close(); 1519 socket.close(); 1520 } catch (Exception ex) { 1521 if (orb.transportDebugFlag) { 1522 dprint(".purgeCalls: Exception closing socket: " + ex 1523 + " " + this); 1524 } 1525 } 1526 1527 // Signal all threads with outstanding requests on this 1528 // connection and give them the SystemException; 1529 1530 responseWaitingRoom.signalExceptionToAllWaiters(systemException); 1531 } finally { 1532 if (contactInfo != null) { 1533 ((OutboundConnectionCache)getConnectionCache()).remove(contactInfo); 1534 } else if (acceptor != null) { 1535 ((InboundConnectionCache)getConnectionCache()).remove(this); 1536 } 1537 1538 // 1539 // REVISIT: Stop the reader thread 1540 // 1541 1542 // Signal all the waiters of the writeLock. 1543 // There are 4 types of writeLock waiters: 1544 // 1. Send waiters: 1545 // 2. SendReply waiters: 1546 // 3. cleanUp waiters: 1547 // 4. purge_call waiters: 1548 // 1549 1550 writeUnlock(); 1551 1552 if (orb.transportDebugFlag) { 1553 dprint(".purgeCalls<-: " 1554 + minor_code + "/" + die + "/" + lockHeld 1555 + " " + this); 1556 } 1557 } 1558 } 1559 1560 /************************************************************************* 1561 * The following methods are for dealing with Connection cleaning for 1562 * better scalability of servers in high network load conditions. 1563 **************************************************************************/ 1564 1565 public void sendCloseConnection(GIOPVersion giopVersion) 1566 throws IOException 1567 { 1568 Message msg = MessageBase.createCloseConnection(giopVersion); 1569 sendHelper(giopVersion, msg); 1570 } 1571 1572 public void sendMessageError(GIOPVersion giopVersion) 1573 throws IOException 1574 { 1575 Message msg = MessageBase.createMessageError(giopVersion); 1576 sendHelper(giopVersion, msg); 1577 } 1578 1579 /** 1580 * Send a CancelRequest message. This does not lock the connection, so the 1581 * caller needs to ensure this method is called appropriately. 1582 * @exception IOException - could be due to abortive connection closure. 1583 */ 1584 public void sendCancelRequest(GIOPVersion giopVersion, int requestId) 1585 throws IOException 1586 { 1587 1588 Message msg = MessageBase.createCancelRequest(giopVersion, requestId); 1589 sendHelper(giopVersion, msg); 1590 } 1591 1592 protected void sendHelper(GIOPVersion giopVersion, Message msg) 1593 throws IOException 1594 { 1595 // REVISIT: See comments in CDROutputObject constructor. 1596 CDROutputObject outputObject = 1597 sun.corba.OutputStreamFactory.newCDROutputObject((ORB)orb, null, giopVersion, 1598 this, msg, ORBConstants.STREAM_FORMAT_VERSION_1); 1599 msg.write(outputObject); 1600 1601 outputObject.writeTo(this); 1602 } 1603 1604 public void sendCancelRequestWithLock(GIOPVersion giopVersion, 1605 int requestId) 1606 throws IOException 1607 { 1608 writeLock(); 1609 try { 1610 sendCancelRequest(giopVersion, requestId); 1611 } finally { 1612 writeUnlock(); 1613 } 1614 } 1615 1616 // Begin Code Base methods --------------------------------------- 1617 // 1618 // Set this connection's code base IOR. The IOR comes from the 1619 // SendingContext. This is an optional service context, but all 1620 // JavaSoft ORBs send it. 1621 // 1622 // The set and get methods don't need to be synchronized since the 1623 // first possible get would occur during reading a valuetype, and 1624 // that would be after the set. 1625 1626 // Sets this connection's code base IOR. This is done after 1627 // getting the IOR out of the SendingContext service context. 1628 // Our ORBs always send this, but it's optional in CORBA. 1629 1630 public final void setCodeBaseIOR(IOR ior) { 1631 codeBaseServerIOR = ior; 1632 } 1633 1634 public final IOR getCodeBaseIOR() { 1635 return codeBaseServerIOR; 1636 } 1637 1638 // Get a CodeBase stub to use in unmarshaling. The CachedCodeBase 1639 // won't connect to the remote codebase unless it's necessary. 1640 public final CodeBase getCodeBase() { 1641 return cachedCodeBase; 1642 } 1643 1644 // End Code Base methods ----------------------------------------- 1645 1646 // set transport read thresholds 1647 protected void setReadTimeouts(ReadTimeouts readTimeouts) { 1648 this.readTimeouts = readTimeouts; 1649 } 1650 1651 protected void setPartialMessageMediator(CorbaMessageMediator messageMediator) { 1652 partialMessageMediator = messageMediator; 1653 } 1654 1655 protected CorbaMessageMediator getPartialMessageMediator() { 1656 return partialMessageMediator; 1657 } 1658 1659 public String toString() 1660 { 1661 synchronized ( stateEvent ){ 1662 return 1663 "SocketOrChannelConnectionImpl[" + " " 1664 + (socketChannel == null ? 1665 socket.toString() : socketChannel.toString()) + " " 1666 + getStateString( state ) + " " 1667 + shouldUseSelectThreadToWait() + " " 1668 + shouldUseWorkerThreadForEvent() + " " 1669 + shouldReadGiopHeaderOnly() 1670 + "]" ; 1671 } 1672 } 1673 1674 // Must be public - used in encoding. 1675 public void dprint(String msg) 1676 { 1677 ORBUtility.dprint("SocketOrChannelConnectionImpl", msg); 1678 } 1679 1680 protected void dprint(String msg, Throwable t) 1681 { 1682 dprint(msg); 1683 t.printStackTrace(System.out); 1684 } 1685} 1686 1687// End of file. 1688