1/*
2 * Copyright (c) 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24import java.io.*;
25import java.nio.ByteBuffer;
26
27import jdk.incubator.http.internal.common.ByteBufferReference;
28import jdk.incubator.http.internal.common.Queue;
29import jdk.incubator.http.internal.frame.DataFrame;
30
31/**
32 * OutputStream. Incoming window updates handled by the main connection
33 * reader thread.
34 */
35@SuppressWarnings({"rawtypes","unchecked"})
36class BodyOutputStream extends OutputStream {
37    final static byte[] EMPTY_BARRAY = new byte[0];
38
39    final int streamid;
40    int window;
41    boolean closed;
42    boolean goodToGo = false; // not allowed to send until headers sent
43    final Http2TestServerConnection conn;
44    final Queue outputQ;
45
46    BodyOutputStream(int streamid, int initialWindow, Http2TestServerConnection conn) {
47        this.window = initialWindow;
48        this.streamid = streamid;
49        this.conn = conn;
50        this.outputQ = conn.outputQ;
51        conn.registerStreamWindowUpdater(streamid, this::updateWindow);
52    }
53
54    // called from connection reader thread as all incoming window
55    // updates are handled there.
56    synchronized void updateWindow(int update) {
57        window += update;
58        notifyAll();
59    }
60
61    void waitForWindow(int demand) throws InterruptedException {
62        // first wait for the connection window
63        conn.obtainConnectionWindow(demand);
64        // now wait for the stream window
65        synchronized (this) {
66            while (demand > 0) {
67                int n = Math.min(demand, window);
68                demand -= n;
69                window -= n;
70                if (demand > 0) {
71                    wait();
72                }
73            }
74        }
75    }
76
77    void goodToGo() {
78        goodToGo = true;
79    }
80
81    @Override
82    public void write(byte[] buf, int offset, int len) throws IOException {
83        if (closed) {
84            throw new IOException("closed");
85        }
86
87        if (!goodToGo) {
88            throw new IllegalStateException("sendResponseHeaders must be called first");
89        }
90        try {
91            waitForWindow(len);
92            send(buf, offset, len, 0);
93        } catch (InterruptedException ex) {
94            throw new IOException(ex);
95        }
96    }
97
98    private void send(byte[] buf, int offset, int len, int flags) throws IOException {
99        ByteBuffer buffer = ByteBuffer.allocate(len);
100        buffer.put(buf, offset, len);
101        buffer.flip();
102        assert streamid != 0;
103        DataFrame df = new DataFrame(streamid, flags, ByteBufferReference.of(buffer));
104        outputQ.put(df);
105    }
106
107    byte[] one = new byte[1];
108
109    @Override
110    public void write(int b) throws IOException {
111        one[0] = (byte) b;
112        write(one, 0, 1);
113    }
114
115    void closeInternal() {
116        closed = true;
117    }
118
119    @Override
120    public void close() {
121        if (closed) {
122            return;
123        }
124        closed = true;
125        try {
126            send(EMPTY_BARRAY, 0, 0, DataFrame.END_STREAM);
127        } catch (IOException ex) {
128            System.err.println("TestServer: OutputStream.close exception: " + ex);
129            ex.printStackTrace();
130        }
131    }
132}
133