1/* 2 * Copyright (c) 2002, 2012, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24import java.net.*; 25import java.io.*; 26import java.nio.*; 27import java.nio.channels.*; 28import sun.net.www.MessageHeader; 29import java.util.*; 30import javax.net.ssl.*; 31import javax.net.ssl.SSLEngineResult.*; 32import java.security.*; 33 34/** 35 * This class implements a simple HTTPS server. It uses multiple threads to 36 * handle connections in parallel, and will spin off a new thread to handle 37 * each request. (this is easier to implement with SSLEngine) 38 * <p> 39 * It must be instantiated with a {@link HttpCallback} object to which 40 * requests are given and must be handled. 41 * <p> 42 * Simple synchronization between the client(s) and server can be done 43 * using the {@link #waitForCondition(String)}, {@link #setCondition(String)} and 44 * {@link #rendezvous(String,int)} methods. 45 * 46 * NOTE NOTE NOTE NOTE NOTE NOTE NOTE 47 * 48 * If you make a change in here, please don't forget to make the 49 * corresponding change in the J2SE equivalent. 50 * 51 * NOTE NOTE NOTE NOTE NOTE NOTE NOTE 52 */ 53 54public class TestHttpsServer { 55 56 ServerSocketChannel schan; 57 int threads; 58 int cperthread; 59 HttpCallback cb; 60 Server[] servers; 61 62 // ssl related fields 63 static SSLContext sslCtx; 64 65 /** 66 * Create a <code>TestHttpsServer<code> instance with the specified callback object 67 * for handling requests. One thread is created to handle requests, 68 * and up to ten TCP connections will be handled simultaneously. 69 * @param cb the callback object which is invoked to handle each 70 * incoming request 71 */ 72 73 public TestHttpsServer (HttpCallback cb) throws IOException { 74 this (cb, 1, 10, 0); 75 } 76 77 /** 78 * Create a <code>TestHttpsServer<code> instance with the specified number of 79 * threads and maximum number of connections per thread. This functions 80 * the same as the 4 arg constructor, where the port argument is set to zero. 81 * @param cb the callback object which is invoked to handle each 82 * incoming request 83 * @param threads the number of threads to create to handle requests 84 * in parallel 85 * @param cperthread the number of simultaneous TCP connections to 86 * handle per thread 87 */ 88 89 public TestHttpsServer (HttpCallback cb, int threads, int cperthread) 90 throws IOException { 91 this (cb, threads, cperthread, 0); 92 } 93 94 /** 95 * Create a <code>TestHttpsServer<code> instance with the specified number 96 * of threads and maximum number of connections per thread and running on 97 * the specified port. The specified number of threads are created to 98 * handle incoming requests, and each thread is allowed 99 * to handle a number of simultaneous TCP connections. 100 * @param cb the callback object which is invoked to handle 101 * each incoming request 102 * @param threads the number of threads to create to handle 103 * requests in parallel 104 * @param cperthread the number of simultaneous TCP connections 105 * to handle per thread 106 * @param port the port number to bind the server to. <code>Zero</code> 107 * means choose any free port. 108 */ 109 110 public TestHttpsServer (HttpCallback cb, int threads, int cperthread, int port) 111 throws IOException { 112 schan = ServerSocketChannel.open (); 113 InetSocketAddress addr = new InetSocketAddress (port); 114 schan.socket().bind (addr); 115 this.threads = threads; 116 this.cb = cb; 117 this.cperthread = cperthread; 118 119 try { 120 // create and initialize a SSLContext 121 KeyStore ks = KeyStore.getInstance("JKS"); 122 KeyStore ts = KeyStore.getInstance("JKS"); 123 char[] passphrase = "passphrase".toCharArray(); 124 125 ks.load(new FileInputStream(System.getProperty("javax.net.ssl.keyStore")), passphrase); 126 ts.load(new FileInputStream(System.getProperty("javax.net.ssl.trustStore")), passphrase); 127 128 KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); 129 kmf.init(ks, passphrase); 130 131 TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); 132 tmf.init(ts); 133 134 sslCtx = SSLContext.getInstance("TLS"); 135 136 sslCtx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); 137 138 servers = new Server [threads]; 139 for (int i=0; i<threads; i++) { 140 servers[i] = new Server (cb, schan, cperthread); 141 servers[i].start(); 142 } 143 } catch (Exception ex) { 144 throw new RuntimeException("test failed. cause: "+ex.getMessage()); 145 } 146 } 147 148 /** Tell all threads in the server to exit within 5 seconds. 149 * This is an abortive termination. Just prior to the thread exiting 150 * all channels in that thread waiting to be closed are forceably closed. 151 */ 152 153 public void terminate () { 154 for (int i=0; i<threads; i++) { 155 servers[i].terminate (); 156 } 157 } 158 159 /** 160 * return the local port number to which the server is bound. 161 * @return the local port number 162 */ 163 164 public int getLocalPort () { 165 return schan.socket().getLocalPort (); 166 } 167 168 static class Server extends Thread { 169 170 ServerSocketChannel schan; 171 Selector selector; 172 SelectionKey listenerKey; 173 SelectionKey key; /* the current key being processed */ 174 HttpCallback cb; 175 ByteBuffer consumeBuffer; 176 int maxconn; 177 int nconn; 178 ClosedChannelList clist; 179 boolean shutdown; 180 181 Server (HttpCallback cb, ServerSocketChannel schan, int maxconn) { 182 this.schan = schan; 183 this.maxconn = maxconn; 184 this.cb = cb; 185 nconn = 0; 186 consumeBuffer = ByteBuffer.allocate (512); 187 clist = new ClosedChannelList (); 188 try { 189 selector = Selector.open (); 190 schan.configureBlocking (false); 191 listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT); 192 } catch (IOException e) { 193 System.err.println ("Server could not start: " + e); 194 } 195 } 196 197 /* Stop the thread as soon as possible */ 198 public synchronized void terminate () { 199 shutdown = true; 200 } 201 202 public void run () { 203 try { 204 while (true) { 205 selector.select (1000); 206 Set selected = selector.selectedKeys(); 207 Iterator iter = selected.iterator(); 208 while (iter.hasNext()) { 209 key = (SelectionKey)iter.next(); 210 if (key.equals (listenerKey)) { 211 SocketChannel sock = schan.accept (); 212 if (sock == null) { 213 /* false notification */ 214 iter.remove(); 215 continue; 216 } 217 sock.configureBlocking (true); 218 SSLEngine sslEng = sslCtx.createSSLEngine(); 219 sslEng.setUseClientMode(false); 220 new ServerWorker(cb, sock, sslEng).start(); 221 nconn ++; 222 if (nconn == maxconn) { 223 /* deregister */ 224 listenerKey.cancel (); 225 listenerKey = null; 226 } 227 } else { 228 if (key.isReadable()) { 229 boolean closed = false; 230 SocketChannel chan = (SocketChannel) key.channel(); 231 if (key.attachment() != null) { 232 closed = consume (chan); 233 } 234 235 if (closed) { 236 chan.close (); 237 key.cancel (); 238 if (nconn == maxconn) { 239 listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT); 240 } 241 nconn --; 242 } 243 } 244 } 245 iter.remove(); 246 } 247 clist.check(); 248 249 synchronized (this) { 250 if (shutdown) { 251 clist.terminate (); 252 return; 253 } 254 } 255 } 256 } catch (IOException e) { 257 System.out.println ("Server exception: " + e); 258 // TODO finish 259 } 260 } 261 262 /* read all the data off the channel without looking at it 263 * return true if connection closed 264 */ 265 boolean consume (SocketChannel chan) { 266 try { 267 consumeBuffer.clear (); 268 int c = chan.read (consumeBuffer); 269 if (c == -1) 270 return true; 271 } catch (IOException e) { 272 return true; 273 } 274 return false; 275 } 276 } 277 278 static class ServerWorker extends Thread { 279 private ByteBuffer inNetBB; 280 private ByteBuffer outNetBB; 281 private ByteBuffer inAppBB; 282 private ByteBuffer outAppBB; 283 284 SSLEngine sslEng; 285 SocketChannel schan; 286 HttpCallback cb; 287 HandshakeStatus currentHSStatus; 288 boolean initialHSComplete; 289 /* 290 * All inbound data goes through this buffer. 291 * 292 * It might be nice to use a cache of ByteBuffers so we're 293 * not alloc/dealloc'ing all over the place. 294 */ 295 296 /* 297 * Application buffers, also used for handshaking 298 */ 299 private int appBBSize; 300 301 ServerWorker (HttpCallback cb, SocketChannel schan, SSLEngine sslEng) { 302 this.sslEng = sslEng; 303 this.schan = schan; 304 this.cb = cb; 305 currentHSStatus = HandshakeStatus.NEED_UNWRAP; 306 initialHSComplete = false; 307 int netBBSize = sslEng.getSession().getPacketBufferSize(); 308 inNetBB = ByteBuffer.allocate(netBBSize); 309 outNetBB = ByteBuffer.allocate(netBBSize); 310 appBBSize = sslEng.getSession().getApplicationBufferSize(); 311 inAppBB = ByteBuffer.allocate(appBBSize); 312 outAppBB = ByteBuffer.allocate(appBBSize); 313 } 314 315 public SSLEngine getSSLEngine() { 316 return sslEng; 317 } 318 319 public ByteBuffer outNetBB() { 320 return outNetBB; 321 } 322 323 public ByteBuffer outAppBB() { 324 return outAppBB; 325 } 326 327 public void run () { 328 try { 329 SSLEngineResult result; 330 331 while (!initialHSComplete) { 332 333 switch (currentHSStatus) { 334 335 case NEED_UNWRAP: 336 int bytes = schan.read(inNetBB); 337 338needIO: 339 while (currentHSStatus == HandshakeStatus.NEED_UNWRAP) { 340 /* 341 * Don't need to resize requestBB, since no app data should 342 * be generated here. 343 */ 344 inNetBB.flip(); 345 result = sslEng.unwrap(inNetBB, inAppBB); 346 inNetBB.compact(); 347 currentHSStatus = result.getHandshakeStatus(); 348 349 switch (result.getStatus()) { 350 351 case OK: 352 switch (currentHSStatus) { 353 case NOT_HANDSHAKING: 354 throw new IOException( 355 "Not handshaking during initial handshake"); 356 357 case NEED_TASK: 358 Runnable task; 359 while ((task = sslEng.getDelegatedTask()) != null) { 360 task.run(); 361 currentHSStatus = sslEng.getHandshakeStatus(); 362 } 363 break; 364 } 365 366 break; 367 368 case BUFFER_UNDERFLOW: 369 break needIO; 370 371 default: // BUFFER_OVERFLOW/CLOSED: 372 throw new IOException("Received" + result.getStatus() + 373 "during initial handshaking"); 374 } 375 } 376 377 /* 378 * Just transitioned from read to write. 379 */ 380 if (currentHSStatus != HandshakeStatus.NEED_WRAP) { 381 break; 382 } 383 384 // Fall through and fill the write buffer. 385 386 case NEED_WRAP: 387 /* 388 * The flush above guarantees the out buffer to be empty 389 */ 390 outNetBB.clear(); 391 result = sslEng.wrap(inAppBB, outNetBB); 392 outNetBB.flip(); 393 schan.write (outNetBB); 394 outNetBB.compact(); 395 currentHSStatus = result.getHandshakeStatus(); 396 397 switch (result.getStatus()) { 398 case OK: 399 400 if (currentHSStatus == HandshakeStatus.NEED_TASK) { 401 Runnable task; 402 while ((task = sslEng.getDelegatedTask()) != null) { 403 task.run(); 404 currentHSStatus = sslEng.getHandshakeStatus(); 405 } 406 } 407 408 break; 409 410 default: // BUFFER_OVERFLOW/BUFFER_UNDERFLOW/CLOSED: 411 throw new IOException("Received" + result.getStatus() + 412 "during initial handshaking"); 413 } 414 break; 415 416 case FINISHED: 417 initialHSComplete = true; 418 break; 419 default: // NOT_HANDSHAKING/NEED_TASK 420 throw new RuntimeException("Invalid Handshaking State" + 421 currentHSStatus); 422 } // switch 423 } 424 // read the application data; using non-blocking mode 425 schan.configureBlocking(false); 426 read(schan, sslEng); 427 } catch (Exception ex) { 428 throw new RuntimeException(ex); 429 } 430 } 431 432 /* return true if the connection is closed, false otherwise */ 433 434 private boolean read (SocketChannel chan, SSLEngine sslEng) { 435 HttpTransaction msg; 436 boolean res; 437 try { 438 InputStream is = new BufferedInputStream (new NioInputStream (chan, sslEng, inNetBB, inAppBB)); 439 String requestline = readLine (is); 440 MessageHeader mhead = new MessageHeader (is); 441 String clen = mhead.findValue ("Content-Length"); 442 String trferenc = mhead.findValue ("Transfer-Encoding"); 443 String data = null; 444 if (trferenc != null && trferenc.equals ("chunked")) 445 data = new String (readChunkedData (is)); 446 else if (clen != null) 447 data = new String (readNormalData (is, Integer.parseInt (clen))); 448 String[] req = requestline.split (" "); 449 if (req.length < 2) { 450 /* invalid request line */ 451 return false; 452 } 453 String cmd = req[0]; 454 URI uri = null; 455 try { 456 uri = new URI (req[1]); 457 msg = new HttpTransaction (this, cmd, uri, mhead, data, null, chan); 458 cb.request (msg); 459 } catch (URISyntaxException e) { 460 System.err.println ("Invalid URI: " + e); 461 msg = new HttpTransaction (this, cmd, null, null, null, null, chan); 462 msg.sendResponse (501, "Whatever"); 463 } 464 res = false; 465 } catch (IOException e) { 466 res = true; 467 } 468 return res; 469 } 470 471 byte[] readNormalData (InputStream is, int len) throws IOException { 472 byte [] buf = new byte [len]; 473 int c, off=0, remain=len; 474 while (remain > 0 && ((c=is.read (buf, off, remain))>0)) { 475 remain -= c; 476 off += c; 477 } 478 return buf; 479 } 480 481 private void readCRLF(InputStream is) throws IOException { 482 int cr = is.read(); 483 int lf = is.read(); 484 485 if (((cr & 0xff) != 0x0d) || 486 ((lf & 0xff) != 0x0a)) { 487 throw new IOException( 488 "Expected <CR><LF>: got '" + cr + "/" + lf + "'"); 489 } 490 } 491 492 byte[] readChunkedData (InputStream is) throws IOException { 493 LinkedList l = new LinkedList (); 494 int total = 0; 495 for (int len=readChunkLen(is); len!=0; len=readChunkLen(is)) { 496 l.add (readNormalData(is, len)); 497 total += len; 498 readCRLF(is); // CRLF at end of chunk 499 } 500 readCRLF(is); // CRLF at end of Chunked Stream. 501 byte[] buf = new byte [total]; 502 Iterator i = l.iterator(); 503 int x = 0; 504 while (i.hasNext()) { 505 byte[] b = (byte[])i.next(); 506 System.arraycopy (b, 0, buf, x, b.length); 507 x += b.length; 508 } 509 return buf; 510 } 511 512 private int readChunkLen (InputStream is) throws IOException { 513 int c, len=0; 514 boolean done=false, readCR=false; 515 while (!done) { 516 c = is.read (); 517 if (c == '\n' && readCR) { 518 done = true; 519 } else { 520 if (c == '\r' && !readCR) { 521 readCR = true; 522 } else { 523 int x=0; 524 if (c >= 'a' && c <= 'f') { 525 x = c - 'a' + 10; 526 } else if (c >= 'A' && c <= 'F') { 527 x = c - 'A' + 10; 528 } else if (c >= '0' && c <= '9') { 529 x = c - '0'; 530 } 531 len = len * 16 + x; 532 } 533 } 534 } 535 return len; 536 } 537 538 private String readLine (InputStream is) throws IOException { 539 boolean done=false, readCR=false; 540 byte[] b = new byte [512]; 541 int c, l = 0; 542 543 while (!done) { 544 c = is.read (); 545 if (c == '\n' && readCR) { 546 done = true; 547 } else { 548 if (c == '\r' && !readCR) { 549 readCR = true; 550 } else { 551 b[l++] = (byte)c; 552 } 553 } 554 } 555 return new String (b); 556 } 557 558 /** close the channel associated with the current key by: 559 * 1. shutdownOutput (send a FIN) 560 * 2. mark the key so that incoming data is to be consumed and discarded 561 * 3. After a period, close the socket 562 */ 563 564 synchronized void orderlyCloseChannel (SocketChannel ch) throws IOException { 565 ch.socket().shutdownOutput(); 566 } 567 568 synchronized void abortiveCloseChannel (SocketChannel ch) throws IOException { 569 Socket s = ch.socket (); 570 s.setSoLinger (true, 0); 571 ch.close(); 572 } 573 } 574 575 576 /** 577 * Implements blocking reading semantics on top of a non-blocking channel 578 */ 579 580 static class NioInputStream extends InputStream { 581 SSLEngine sslEng; 582 SocketChannel channel; 583 Selector selector; 584 ByteBuffer inNetBB; 585 ByteBuffer inAppBB; 586 SelectionKey key; 587 int available; 588 byte[] one; 589 boolean closed; 590 ByteBuffer markBuf; /* reads may be satisifed from this buffer */ 591 boolean marked; 592 boolean reset; 593 int readlimit; 594 595 public NioInputStream (SocketChannel chan, SSLEngine sslEng, ByteBuffer inNetBB, ByteBuffer inAppBB) throws IOException { 596 this.sslEng = sslEng; 597 this.channel = chan; 598 selector = Selector.open(); 599 this.inNetBB = inNetBB; 600 this.inAppBB = inAppBB; 601 key = chan.register (selector, SelectionKey.OP_READ); 602 available = 0; 603 one = new byte[1]; 604 closed = marked = reset = false; 605 } 606 607 public synchronized int read (byte[] b) throws IOException { 608 return read (b, 0, b.length); 609 } 610 611 public synchronized int read () throws IOException { 612 return read (one, 0, 1); 613 } 614 615 public synchronized int read (byte[] b, int off, int srclen) throws IOException { 616 617 int canreturn, willreturn; 618 619 if (closed) 620 return -1; 621 622 if (reset) { /* satisfy from markBuf */ 623 canreturn = markBuf.remaining (); 624 willreturn = canreturn>srclen ? srclen : canreturn; 625 markBuf.get(b, off, willreturn); 626 if (canreturn == willreturn) { 627 reset = false; 628 } 629 } else { /* satisfy from channel */ 630 canreturn = available(); 631 if (canreturn == 0) { 632 block (); 633 canreturn = available(); 634 } 635 willreturn = canreturn>srclen ? srclen : canreturn; 636 inAppBB.get(b, off, willreturn); 637 available -= willreturn; 638 639 if (marked) { /* copy into markBuf */ 640 try { 641 markBuf.put (b, off, willreturn); 642 } catch (BufferOverflowException e) { 643 marked = false; 644 } 645 } 646 } 647 return willreturn; 648 } 649 650 public synchronized int available () throws IOException { 651 if (closed) 652 throw new IOException ("Stream is closed"); 653 654 if (reset) 655 return markBuf.remaining(); 656 657 if (available > 0) 658 return available; 659 660 inAppBB.clear (); 661 int bytes = channel.read (inNetBB); 662 663 int needed = sslEng.getSession().getApplicationBufferSize(); 664 if (needed > inAppBB.remaining()) { 665 inAppBB = ByteBuffer.allocate(needed); 666 } 667 inNetBB.flip(); 668 SSLEngineResult result = sslEng.unwrap(inNetBB, inAppBB); 669 inNetBB.compact(); 670 available = result.bytesProduced(); 671 672 if (available > 0) 673 inAppBB.flip(); 674 else if (available == -1) 675 throw new IOException ("Stream is closed"); 676 return available; 677 } 678 679 /** 680 * block() only called when available==0 and buf is empty 681 */ 682 private synchronized void block () throws IOException { 683 //assert available == 0; 684 int n = selector.select (); 685 //assert n == 1; 686 selector.selectedKeys().clear(); 687 available (); 688 } 689 690 public void close () throws IOException { 691 if (closed) 692 return; 693 channel.close (); 694 closed = true; 695 } 696 697 public synchronized void mark (int readlimit) { 698 if (closed) 699 return; 700 this.readlimit = readlimit; 701 markBuf = ByteBuffer.allocate (readlimit); 702 marked = true; 703 reset = false; 704 } 705 706 public synchronized void reset () throws IOException { 707 if (closed ) 708 return; 709 if (!marked) 710 throw new IOException ("Stream not marked"); 711 marked = false; 712 reset = true; 713 markBuf.flip (); 714 } 715 } 716 717 static class NioOutputStream extends OutputStream { 718 SSLEngine sslEng; 719 SocketChannel channel; 720 ByteBuffer outNetBB; 721 ByteBuffer outAppBB; 722 SelectionKey key; 723 Selector selector; 724 boolean closed; 725 byte[] one; 726 727 public NioOutputStream (SocketChannel channel, SSLEngine sslEng, ByteBuffer outNetBB, ByteBuffer outAppBB) throws IOException { 728 this.sslEng = sslEng; 729 this.channel = channel; 730 this.outNetBB = outNetBB; 731 this.outAppBB = outAppBB; 732 selector = Selector.open (); 733 key = channel.register (selector, SelectionKey.OP_WRITE); 734 closed = false; 735 one = new byte [1]; 736 } 737 738 public synchronized void write (int b) throws IOException { 739 one[0] = (byte)b; 740 write (one, 0, 1); 741 } 742 743 public synchronized void write (byte[] b) throws IOException { 744 write (b, 0, b.length); 745 } 746 747 public synchronized void write (byte[] b, int off, int len) throws IOException { 748 if (closed) 749 throw new IOException ("stream is closed"); 750 751 outAppBB = ByteBuffer.allocate (len); 752 outAppBB.put (b, off, len); 753 outAppBB.flip (); 754 int n; 755 outNetBB.clear(); 756 int needed = sslEng.getSession().getPacketBufferSize(); 757 if (outNetBB.capacity() < needed) { 758 outNetBB = ByteBuffer.allocate(needed); 759 } 760 SSLEngineResult ret = sslEng.wrap(outAppBB, outNetBB); 761 outNetBB.flip(); 762 int newLen = ret.bytesProduced(); 763 while ((n = channel.write (outNetBB)) < newLen) { 764 newLen -= n; 765 if (newLen == 0) 766 return; 767 selector.select (); 768 selector.selectedKeys().clear (); 769 } 770 } 771 772 public void close () throws IOException { 773 if (closed) 774 return; 775 channel.close (); 776 closed = true; 777 } 778 } 779 780 /** 781 * Utilities for synchronization. A condition is 782 * identified by a string name, and is initialized 783 * upon first use (ie. setCondition() or waitForCondition()). Threads 784 * are blocked until some thread calls (or has called) setCondition() for the same 785 * condition. 786 * <P> 787 * A rendezvous built on a condition is also provided for synchronizing 788 * N threads. 789 */ 790 791 private static HashMap conditions = new HashMap(); 792 793 /* 794 * Modifiable boolean object 795 */ 796 private static class BValue { 797 boolean v; 798 } 799 800 /* 801 * Modifiable int object 802 */ 803 private static class IValue { 804 int v; 805 IValue (int i) { 806 v =i; 807 } 808 } 809 810 811 private static BValue getCond (String condition) { 812 synchronized (conditions) { 813 BValue cond = (BValue) conditions.get (condition); 814 if (cond == null) { 815 cond = new BValue(); 816 conditions.put (condition, cond); 817 } 818 return cond; 819 } 820 } 821 822 /** 823 * Set the condition to true. Any threads that are currently blocked 824 * waiting on the condition, will be unblocked and allowed to continue. 825 * Threads that subsequently call waitForCondition() will not block. 826 * If the named condition did not exist prior to the call, then it is created 827 * first. 828 */ 829 830 public static void setCondition (String condition) { 831 BValue cond = getCond (condition); 832 synchronized (cond) { 833 if (cond.v) { 834 return; 835 } 836 cond.v = true; 837 cond.notifyAll(); 838 } 839 } 840 841 /** 842 * If the named condition does not exist, then it is created and initialized 843 * to false. If the condition exists or has just been created and its value 844 * is false, then the thread blocks until another thread sets the condition. 845 * If the condition exists and is already set to true, then this call returns 846 * immediately without blocking. 847 */ 848 849 public static void waitForCondition (String condition) { 850 BValue cond = getCond (condition); 851 synchronized (cond) { 852 if (!cond.v) { 853 try { 854 cond.wait(); 855 } catch (InterruptedException e) {} 856 } 857 } 858 } 859 860 /* conditions must be locked when accessing this */ 861 static HashMap rv = new HashMap(); 862 863 /** 864 * Force N threads to rendezvous (ie. wait for each other) before proceeding. 865 * The first thread(s) to call are blocked until the last 866 * thread makes the call. Then all threads continue. 867 * <p> 868 * All threads that call with the same condition name, must use the same value 869 * for N (or the results may be not be as expected). 870 * <P> 871 * Obviously, if fewer than N threads make the rendezvous then the result 872 * will be a hang. 873 */ 874 875 public static void rendezvous (String condition, int N) { 876 BValue cond; 877 IValue iv; 878 String name = "RV_"+condition; 879 880 /* get the condition */ 881 882 synchronized (conditions) { 883 cond = (BValue)conditions.get (name); 884 if (cond == null) { 885 /* we are first caller */ 886 if (N < 2) { 887 throw new RuntimeException ("rendezvous must be called with N >= 2"); 888 } 889 cond = new BValue (); 890 conditions.put (name, cond); 891 iv = new IValue (N-1); 892 rv.put (name, iv); 893 } else { 894 /* already initialised, just decrement the counter */ 895 iv = (IValue) rv.get (name); 896 iv.v --; 897 } 898 } 899 900 if (iv.v > 0) { 901 waitForCondition (name); 902 } else { 903 setCondition (name); 904 synchronized (conditions) { 905 clearCondition (name); 906 rv.remove (name); 907 } 908 } 909 } 910 911 /** 912 * If the named condition exists and is set then remove it, so it can 913 * be re-initialized and used again. If the condition does not exist, or 914 * exists but is not set, then the call returns without doing anything. 915 * Note, some higher level synchronization 916 * may be needed between clear and the other operations. 917 */ 918 919 public static void clearCondition(String condition) { 920 BValue cond; 921 synchronized (conditions) { 922 cond = (BValue) conditions.get (condition); 923 if (cond == null) { 924 return; 925 } 926 synchronized (cond) { 927 if (cond.v) { 928 conditions.remove (condition); 929 } 930 } 931 } 932 } 933} 934