TestHttpServer.java revision 6073:cea72c2bf071
1/* 2 * Copyright (c) 2002, 2012, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24import java.net.*; 25import java.io.*; 26import java.nio.*; 27import java.nio.channels.*; 28import sun.net.www.MessageHeader; 29import java.util.*; 30 31/** 32 * This class implements a simple HTTP server. It uses multiple threads to 33 * handle connections in parallel, and also multiple connections/requests 34 * can be handled per thread. 35 * <p> 36 * It must be instantiated with a {@link HttpCallback} object to which 37 * requests are given and must be handled. 38 * <p> 39 * Simple synchronization between the client(s) and server can be done 40 * using the {@link #waitForCondition(String)}, {@link #setCondition(String)} and 41 * {@link #rendezvous(String,int)} methods. 42 * 43 * NOTE NOTE NOTE NOTE NOTE NOTE NOTE 44 * 45 * If changes are made here, please sure they are propagated to 46 * the HTTPS equivalent in the JSSE regression test suite. 47 * 48 * NOTE NOTE NOTE NOTE NOTE NOTE NOTE 49 */ 50 51public class TestHttpServer { 52 53 ServerSocketChannel schan; 54 int threads; 55 int cperthread; 56 HttpCallback cb; 57 Server[] servers; 58 59 /** 60 * Create a <code>TestHttpServer<code> instance with the specified callback object 61 * for handling requests. One thread is created to handle requests, 62 * and up to ten TCP connections will be handled simultaneously. 63 * @param cb the callback object which is invoked to handle each 64 * incoming request 65 */ 66 67 public TestHttpServer (HttpCallback cb) throws IOException { 68 this (cb, 1, 10, 0); 69 } 70 71 /** 72 * Create a <code>TestHttpServer<code> instance with the specified number of 73 * threads and maximum number of connections per thread. This functions 74 * the same as the 4 arg constructor, where the port argument is set to zero. 75 * @param cb the callback object which is invoked to handle each 76 * incoming request 77 * @param threads the number of threads to create to handle requests 78 * in parallel 79 * @param cperthread the number of simultaneous TCP connections to 80 * handle per thread 81 */ 82 83 public TestHttpServer (HttpCallback cb, int threads, int cperthread) 84 throws IOException { 85 this (cb, threads, cperthread, 0); 86 } 87 88 /** 89 * Create a <code>TestHttpServer<code> instance with the specified number 90 * of threads and maximum number of connections per thread and running on 91 * the specified port. The specified number of threads are created to 92 * handle incoming requests, and each thread is allowed 93 * to handle a number of simultaneous TCP connections. 94 * @param cb the callback object which is invoked to handle 95 * each incoming request 96 * @param threads the number of threads to create to handle 97 * requests in parallel 98 * @param cperthread the number of simultaneous TCP connections 99 * to handle per thread 100 * @param port the port number to bind the server to. <code>Zero</code> 101 * means choose any free port. 102 */ 103 104 public TestHttpServer (HttpCallback cb, int threads, int cperthread, int port) 105 throws IOException { 106 schan = ServerSocketChannel.open (); 107 InetSocketAddress addr = new InetSocketAddress (port); 108 schan.socket().bind (addr); 109 this.threads = threads; 110 this.cb = cb; 111 this.cperthread = cperthread; 112 servers = new Server [threads]; 113 for (int i=0; i<threads; i++) { 114 servers[i] = new Server (cb, schan, cperthread); 115 servers[i].start(); 116 } 117 } 118 119 /** Tell all threads in the server to exit within 5 seconds. 120 * This is an abortive termination. Just prior to the thread exiting 121 * all channels in that thread waiting to be closed are forceably closed. 122 */ 123 124 public void terminate () { 125 for (int i=0; i<threads; i++) { 126 servers[i].terminate (); 127 } 128 } 129 130 /** 131 * return the local port number to which the server is bound. 132 * @return the local port number 133 */ 134 135 public int getLocalPort () { 136 return schan.socket().getLocalPort (); 137 } 138 139 static class Server extends Thread { 140 141 ServerSocketChannel schan; 142 Selector selector; 143 SelectionKey listenerKey; 144 SelectionKey key; /* the current key being processed */ 145 HttpCallback cb; 146 ByteBuffer consumeBuffer; 147 int maxconn; 148 int nconn; 149 ClosedChannelList clist; 150 boolean shutdown; 151 152 Server (HttpCallback cb, ServerSocketChannel schan, int maxconn) { 153 this.schan = schan; 154 this.maxconn = maxconn; 155 this.cb = cb; 156 nconn = 0; 157 consumeBuffer = ByteBuffer.allocate (512); 158 clist = new ClosedChannelList (); 159 try { 160 selector = Selector.open (); 161 schan.configureBlocking (false); 162 listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT); 163 } catch (IOException e) { 164 System.err.println ("Server could not start: " + e); 165 } 166 } 167 168 /* Stop the thread as soon as possible */ 169 public synchronized void terminate () { 170 shutdown = true; 171 } 172 173 public void run () { 174 try { 175 while (true) { 176 selector.select (1000); 177 Set selected = selector.selectedKeys(); 178 Iterator iter = selected.iterator(); 179 while (iter.hasNext()) { 180 key = (SelectionKey)iter.next(); 181 if (key.equals (listenerKey)) { 182 SocketChannel sock = schan.accept (); 183 if (sock == null) { 184 /* false notification */ 185 iter.remove(); 186 continue; 187 } 188 sock.configureBlocking (false); 189 sock.register (selector, SelectionKey.OP_READ); 190 nconn ++; 191 System.out.println("SERVER: new connection. chan[" + sock + "]"); 192 if (nconn == maxconn) { 193 /* deregister */ 194 listenerKey.cancel (); 195 listenerKey = null; 196 } 197 } else { 198 if (key.isReadable()) { 199 boolean closed; 200 SocketChannel chan = (SocketChannel) key.channel(); 201 System.out.println("SERVER: connection readable. chan[" + chan + "]"); 202 if (key.attachment() != null) { 203 System.out.println("Server: comsume"); 204 closed = consume (chan); 205 } else { 206 closed = read (chan, key); 207 } 208 if (closed) { 209 chan.close (); 210 key.cancel (); 211 if (nconn == maxconn) { 212 listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT); 213 } 214 nconn --; 215 } 216 } 217 } 218 iter.remove(); 219 } 220 clist.check(); 221 if (shutdown) { 222 clist.terminate (); 223 return; 224 } 225 } 226 } catch (IOException e) { 227 System.out.println ("Server exception: " + e); 228 // TODO finish 229 } 230 } 231 232 /* read all the data off the channel without looking at it 233 * return true if connection closed 234 */ 235 boolean consume (SocketChannel chan) { 236 try { 237 consumeBuffer.clear (); 238 int c = chan.read (consumeBuffer); 239 if (c == -1) 240 return true; 241 } catch (IOException e) { 242 return true; 243 } 244 return false; 245 } 246 247 /* return true if the connection is closed, false otherwise */ 248 249 private boolean read (SocketChannel chan, SelectionKey key) { 250 HttpTransaction msg; 251 boolean res; 252 try { 253 InputStream is = new BufferedInputStream (new NioInputStream (chan)); 254 String requestline = readLine (is); 255 MessageHeader mhead = new MessageHeader (is); 256 String clen = mhead.findValue ("Content-Length"); 257 String trferenc = mhead.findValue ("Transfer-Encoding"); 258 String data = null; 259 if (trferenc != null && trferenc.equals ("chunked")) 260 data = new String (readChunkedData (is)); 261 else if (clen != null) 262 data = new String (readNormalData (is, Integer.parseInt (clen))); 263 String[] req = requestline.split (" "); 264 if (req.length < 2) { 265 /* invalid request line */ 266 return false; 267 } 268 String cmd = req[0]; 269 URI uri = null; 270 try { 271 uri = new URI (req[1]); 272 msg = new HttpTransaction (this, cmd, uri, mhead, data, null, key); 273 cb.request (msg); 274 } catch (URISyntaxException e) { 275 System.err.println ("Invalid URI: " + e); 276 msg = new HttpTransaction (this, cmd, null, null, null, null, key); 277 msg.sendResponse (501, "Whatever"); 278 } 279 res = false; 280 } catch (IOException e) { 281 res = true; 282 } 283 return res; 284 } 285 286 byte[] readNormalData (InputStream is, int len) throws IOException { 287 byte [] buf = new byte [len]; 288 int c, off=0, remain=len; 289 while (remain > 0 && ((c=is.read (buf, off, remain))>0)) { 290 remain -= c; 291 off += c; 292 } 293 return buf; 294 } 295 296 private void readCRLF(InputStream is) throws IOException { 297 int cr = is.read(); 298 int lf = is.read(); 299 300 if (((cr & 0xff) != 0x0d) || 301 ((lf & 0xff) != 0x0a)) { 302 throw new IOException( 303 "Expected <CR><LF>: got '" + cr + "/" + lf + "'"); 304 } 305 } 306 307 byte[] readChunkedData (InputStream is) throws IOException { 308 LinkedList l = new LinkedList (); 309 int total = 0; 310 for (int len=readChunkLen(is); len!=0; len=readChunkLen(is)) { 311 l.add (readNormalData(is, len)); 312 total += len; 313 readCRLF(is); // CRLF at end of chunk 314 } 315 readCRLF(is); // CRLF at end of Chunked Stream. 316 byte[] buf = new byte [total]; 317 Iterator i = l.iterator(); 318 int x = 0; 319 while (i.hasNext()) { 320 byte[] b = (byte[])i.next(); 321 System.arraycopy (b, 0, buf, x, b.length); 322 x += b.length; 323 } 324 return buf; 325 } 326 327 private int readChunkLen (InputStream is) throws IOException { 328 int c, len=0; 329 boolean done=false, readCR=false; 330 while (!done) { 331 c = is.read (); 332 if (c == '\n' && readCR) { 333 done = true; 334 } else { 335 if (c == '\r' && !readCR) { 336 readCR = true; 337 } else { 338 int x=0; 339 if (c >= 'a' && c <= 'f') { 340 x = c - 'a' + 10; 341 } else if (c >= 'A' && c <= 'F') { 342 x = c - 'A' + 10; 343 } else if (c >= '0' && c <= '9') { 344 x = c - '0'; 345 } 346 len = len * 16 + x; 347 } 348 } 349 } 350 return len; 351 } 352 353 private String readLine (InputStream is) throws IOException { 354 boolean done=false, readCR=false; 355 byte[] b = new byte [512]; 356 int c, l = 0; 357 358 while (!done) { 359 c = is.read (); 360 if (c == '\n' && readCR) { 361 done = true; 362 } else { 363 if (c == '\r' && !readCR) { 364 readCR = true; 365 } else { 366 b[l++] = (byte)c; 367 } 368 } 369 } 370 return new String (b); 371 } 372 373 /** close the channel associated with the current key by: 374 * 1. shutdownOutput (send a FIN) 375 * 2. mark the key so that incoming data is to be consumed and discarded 376 * 3. After a period, close the socket 377 */ 378 379 synchronized void orderlyCloseChannel (SelectionKey key) throws IOException { 380 SocketChannel ch = (SocketChannel)key.channel (); 381 System.out.println("SERVER: orderlyCloseChannel chan[" + ch + "]"); 382 ch.socket().shutdownOutput(); 383 key.attach (this); 384 clist.add (key); 385 } 386 387 synchronized void abortiveCloseChannel (SelectionKey key) throws IOException { 388 SocketChannel ch = (SocketChannel)key.channel (); 389 System.out.println("SERVER: abortiveCloseChannel chan[" + ch + "]"); 390 391 Socket s = ch.socket (); 392 s.setSoLinger (true, 0); 393 ch.close(); 394 } 395 } 396 397 398 /** 399 * Implements blocking reading semantics on top of a non-blocking channel 400 */ 401 402 static class NioInputStream extends InputStream { 403 SocketChannel channel; 404 Selector selector; 405 ByteBuffer chanbuf; 406 SelectionKey key; 407 int available; 408 byte[] one; 409 boolean closed; 410 ByteBuffer markBuf; /* reads may be satisifed from this buffer */ 411 boolean marked; 412 boolean reset; 413 int readlimit; 414 415 public NioInputStream (SocketChannel chan) throws IOException { 416 this.channel = chan; 417 selector = Selector.open(); 418 chanbuf = ByteBuffer.allocate (1024); 419 key = chan.register (selector, SelectionKey.OP_READ); 420 available = 0; 421 one = new byte[1]; 422 closed = marked = reset = false; 423 } 424 425 public synchronized int read (byte[] b) throws IOException { 426 return read (b, 0, b.length); 427 } 428 429 public synchronized int read () throws IOException { 430 return read (one, 0, 1); 431 } 432 433 public synchronized int read (byte[] b, int off, int srclen) throws IOException { 434 435 int canreturn, willreturn; 436 437 if (closed) 438 return -1; 439 440 if (reset) { /* satisfy from markBuf */ 441 canreturn = markBuf.remaining (); 442 willreturn = canreturn>srclen ? srclen : canreturn; 443 markBuf.get(b, off, willreturn); 444 if (canreturn == willreturn) { 445 reset = false; 446 } 447 } else { /* satisfy from channel */ 448 canreturn = available(); 449 if (canreturn == 0) { 450 block (); 451 canreturn = available(); 452 } 453 willreturn = canreturn>srclen ? srclen : canreturn; 454 chanbuf.get(b, off, willreturn); 455 available -= willreturn; 456 457 if (marked) { /* copy into markBuf */ 458 try { 459 markBuf.put (b, off, willreturn); 460 } catch (BufferOverflowException e) { 461 marked = false; 462 } 463 } 464 } 465 return willreturn; 466 } 467 468 public synchronized int available () throws IOException { 469 if (closed) 470 throw new IOException ("Stream is closed"); 471 472 if (reset) 473 return markBuf.remaining(); 474 475 if (available > 0) 476 return available; 477 478 chanbuf.clear (); 479 available = channel.read (chanbuf); 480 if (available > 0) 481 chanbuf.flip(); 482 else if (available == -1) 483 throw new IOException ("Stream is closed"); 484 return available; 485 } 486 487 /** 488 * block() only called when available==0 and buf is empty 489 */ 490 private synchronized void block () throws IOException { 491 //assert available == 0; 492 int n = selector.select (); 493 //assert n == 1; 494 selector.selectedKeys().clear(); 495 available (); 496 } 497 498 public void close () throws IOException { 499 if (closed) 500 return; 501 channel.close (); 502 closed = true; 503 } 504 505 public synchronized void mark (int readlimit) { 506 if (closed) 507 return; 508 this.readlimit = readlimit; 509 markBuf = ByteBuffer.allocate (readlimit); 510 marked = true; 511 reset = false; 512 } 513 514 public synchronized void reset () throws IOException { 515 if (closed ) 516 return; 517 if (!marked) 518 throw new IOException ("Stream not marked"); 519 marked = false; 520 reset = true; 521 markBuf.flip (); 522 } 523 } 524 525 static class NioOutputStream extends OutputStream { 526 SocketChannel channel; 527 ByteBuffer buf; 528 SelectionKey key; 529 Selector selector; 530 boolean closed; 531 byte[] one; 532 533 public NioOutputStream (SocketChannel channel) throws IOException { 534 this.channel = channel; 535 selector = Selector.open (); 536 key = channel.register (selector, SelectionKey.OP_WRITE); 537 closed = false; 538 one = new byte [1]; 539 } 540 541 public synchronized void write (int b) throws IOException { 542 one[0] = (byte)b; 543 write (one, 0, 1); 544 } 545 546 public synchronized void write (byte[] b) throws IOException { 547 write (b, 0, b.length); 548 } 549 550 public synchronized void write (byte[] b, int off, int len) throws IOException { 551 if (closed) 552 throw new IOException ("stream is closed"); 553 554 buf = ByteBuffer.allocate (len); 555 buf.put (b, off, len); 556 buf.flip (); 557 int n; 558 while ((n = channel.write (buf)) < len) { 559 len -= n; 560 if (len == 0) 561 return; 562 selector.select (); 563 selector.selectedKeys().clear (); 564 } 565 } 566 567 public void close () throws IOException { 568 if (closed) 569 return; 570 channel.close (); 571 closed = true; 572 } 573 } 574 575 /** 576 * Utilities for synchronization. A condition is 577 * identified by a string name, and is initialized 578 * upon first use (ie. setCondition() or waitForCondition()). Threads 579 * are blocked until some thread calls (or has called) setCondition() for the same 580 * condition. 581 * <P> 582 * A rendezvous built on a condition is also provided for synchronizing 583 * N threads. 584 */ 585 586 private static HashMap conditions = new HashMap(); 587 588 /* 589 * Modifiable boolean object 590 */ 591 private static class BValue { 592 boolean v; 593 } 594 595 /* 596 * Modifiable int object 597 */ 598 private static class IValue { 599 int v; 600 IValue (int i) { 601 v =i; 602 } 603 } 604 605 606 private static BValue getCond (String condition) { 607 synchronized (conditions) { 608 BValue cond = (BValue) conditions.get (condition); 609 if (cond == null) { 610 cond = new BValue(); 611 conditions.put (condition, cond); 612 } 613 return cond; 614 } 615 } 616 617 /** 618 * Set the condition to true. Any threads that are currently blocked 619 * waiting on the condition, will be unblocked and allowed to continue. 620 * Threads that subsequently call waitForCondition() will not block. 621 * If the named condition did not exist prior to the call, then it is created 622 * first. 623 */ 624 625 public static void setCondition (String condition) { 626 BValue cond = getCond (condition); 627 synchronized (cond) { 628 if (cond.v) { 629 return; 630 } 631 cond.v = true; 632 cond.notifyAll(); 633 } 634 } 635 636 /** 637 * If the named condition does not exist, then it is created and initialized 638 * to false. If the condition exists or has just been created and its value 639 * is false, then the thread blocks until another thread sets the condition. 640 * If the condition exists and is already set to true, then this call returns 641 * immediately without blocking. 642 */ 643 644 public static void waitForCondition (String condition) { 645 BValue cond = getCond (condition); 646 synchronized (cond) { 647 if (!cond.v) { 648 try { 649 cond.wait(); 650 } catch (InterruptedException e) {} 651 } 652 } 653 } 654 655 /* conditions must be locked when accessing this */ 656 static HashMap rv = new HashMap(); 657 658 /** 659 * Force N threads to rendezvous (ie. wait for each other) before proceeding. 660 * The first thread(s) to call are blocked until the last 661 * thread makes the call. Then all threads continue. 662 * <p> 663 * All threads that call with the same condition name, must use the same value 664 * for N (or the results may be not be as expected). 665 * <P> 666 * Obviously, if fewer than N threads make the rendezvous then the result 667 * will be a hang. 668 */ 669 670 public static void rendezvous (String condition, int N) { 671 BValue cond; 672 IValue iv; 673 String name = "RV_"+condition; 674 675 /* get the condition */ 676 677 synchronized (conditions) { 678 cond = (BValue)conditions.get (name); 679 if (cond == null) { 680 /* we are first caller */ 681 if (N < 2) { 682 throw new RuntimeException ("rendezvous must be called with N >= 2"); 683 } 684 cond = new BValue (); 685 conditions.put (name, cond); 686 iv = new IValue (N-1); 687 rv.put (name, iv); 688 } else { 689 /* already initialised, just decrement the counter */ 690 iv = (IValue) rv.get (name); 691 iv.v --; 692 } 693 } 694 695 if (iv.v > 0) { 696 waitForCondition (name); 697 } else { 698 setCondition (name); 699 synchronized (conditions) { 700 clearCondition (name); 701 rv.remove (name); 702 } 703 } 704 } 705 706 /** 707 * If the named condition exists and is set then remove it, so it can 708 * be re-initialized and used again. If the condition does not exist, or 709 * exists but is not set, then the call returns without doing anything. 710 * Note, some higher level synchronization 711 * may be needed between clear and the other operations. 712 */ 713 714 public static void clearCondition(String condition) { 715 BValue cond; 716 synchronized (conditions) { 717 cond = (BValue) conditions.get (condition); 718 if (cond == null) { 719 return; 720 } 721 synchronized (cond) { 722 if (cond.v) { 723 conditions.remove (condition); 724 } 725 } 726 } 727 } 728} 729