1/*
2 * Copyright (c) 1996, 2012, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package sun.net.www.http;
27
28import java.io.*;
29
30import sun.net.ProgressSource;
31import sun.net.www.MeteredStream;
32import jdk.internal.misc.InnocuousThread;
33
34/**
35 * A stream that has the property of being able to be kept alive for
36 * multiple downloads from the same server.
37 *
38 * @author Stephen R. Pietrowicz (NCSA)
39 * @author Dave Brown
40 */
41public
42class KeepAliveStream extends MeteredStream implements Hurryable {
43
44    // instance variables
45    HttpClient hc;
46
47    boolean hurried;
48
49    // has this KeepAliveStream been put on the queue for asynchronous cleanup.
50    protected boolean queuedForCleanup = false;
51
52    private static final KeepAliveStreamCleaner queue = new KeepAliveStreamCleaner();
53    private static Thread cleanerThread; // null
54
55    /**
56     * Constructor
57     */
58    public KeepAliveStream(InputStream is, ProgressSource pi, long expected, HttpClient hc)  {
59        super(is, pi, expected);
60        this.hc = hc;
61    }
62
63    /**
64     * Attempt to cache this connection
65     */
66    public void close() throws IOException  {
67        // If the inputstream is closed already, just return.
68        if (closed) {
69            return;
70        }
71
72        // If this stream has already been queued for cleanup.
73        if (queuedForCleanup) {
74            return;
75        }
76
77        // Skip past the data that's left in the Inputstream because
78        // some sort of error may have occurred.
79        // Do this ONLY if the skip won't block. The stream may have
80        // been closed at the beginning of a big file and we don't want
81        // to hang around for nothing. So if we can't skip without blocking
82        // we just close the socket and, therefore, terminate the keepAlive
83        // NOTE: Don't close super class
84        try {
85            if (expected > count) {
86                long nskip = expected - count;
87                if (nskip <= available()) {
88                    do {} while ((nskip = (expected - count)) > 0L
89                                 && skip(Math.min(nskip, available())) > 0L);
90                } else if (expected <= KeepAliveStreamCleaner.MAX_DATA_REMAINING && !hurried) {
91                    //put this KeepAliveStream on the queue so that the data remaining
92                    //on the socket can be cleanup asyncronously.
93                    queueForCleanup(new KeepAliveCleanerEntry(this, hc));
94                } else {
95                    hc.closeServer();
96                }
97            }
98            if (!closed && !hurried && !queuedForCleanup) {
99                hc.finished();
100            }
101        } finally {
102            if (pi != null)
103                pi.finishTracking();
104
105            if (!queuedForCleanup) {
106                // nulling out the underlying inputstream as well as
107                // httpClient to let gc collect the memories faster
108                in = null;
109                hc = null;
110                closed = true;
111            }
112        }
113    }
114
115    /* we explicitly do not support mark/reset */
116
117    public boolean markSupported()  {
118        return false;
119    }
120
121    public void mark(int limit) {}
122
123    public void reset() throws IOException {
124        throw new IOException("mark/reset not supported");
125    }
126
127    public synchronized boolean hurry() {
128        try {
129            /* CASE 0: we're actually already done */
130            if (closed || count >= expected) {
131                return false;
132            } else if (in.available() < (expected - count)) {
133                /* CASE I: can't meet the demand */
134                return false;
135            } else {
136                /* CASE II: fill our internal buffer
137                 * Remind: possibly check memory here
138                 */
139                int size = (int) (expected - count);
140                byte[] buf = new byte[size];
141                DataInputStream dis = new DataInputStream(in);
142                dis.readFully(buf);
143                in = new ByteArrayInputStream(buf);
144                hurried = true;
145                return true;
146            }
147        } catch (IOException e) {
148            // e.printStackTrace();
149            return false;
150        }
151    }
152
153    private static void queueForCleanup(KeepAliveCleanerEntry kace) {
154        synchronized(queue) {
155            if(!kace.getQueuedForCleanup()) {
156                if (!queue.offer(kace)) {
157                    kace.getHttpClient().closeServer();
158                    return;
159                }
160
161                kace.setQueuedForCleanup();
162                queue.notifyAll();
163            }
164
165            boolean startCleanupThread = (cleanerThread == null);
166            if (!startCleanupThread) {
167                if (!cleanerThread.isAlive()) {
168                    startCleanupThread = true;
169                }
170            }
171
172            if (startCleanupThread) {
173                java.security.AccessController.doPrivileged(
174                    new java.security.PrivilegedAction<Void>() {
175                    public Void run() {
176                        cleanerThread = InnocuousThread.newSystemThread("Keep-Alive-SocketCleaner", queue);
177                        cleanerThread.setDaemon(true);
178                        cleanerThread.setPriority(Thread.MAX_PRIORITY - 2);
179                        cleanerThread.start();
180                        return null;
181                    }
182                });
183            }
184        } // queue
185    }
186
187    protected long remainingToRead() {
188        return expected - count;
189    }
190
191    protected void setClosed() {
192        in = null;
193        hc = null;
194        closed = true;
195    }
196}
197