1/*
2 * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24package jdk.testlibrary;
25
26import java.io.BufferedInputStream;
27import java.io.ByteArrayOutputStream;
28import java.io.OutputStream;
29import java.io.InputStream;
30import java.io.IOException;
31import java.util.HashSet;
32import java.util.Set;
33import java.util.concurrent.Future;
34import java.util.concurrent.FutureTask;
35import java.util.concurrent.atomic.AtomicBoolean;
36
37/**
38 * @deprecated This class is deprecated. Use the one from
39 *             {@code <root>/test/lib/share/classes/jdk/test/lib/process}
40 */
41@Deprecated
42public final class StreamPumper implements Runnable {
43
44    private static final int BUF_SIZE = 256;
45
46    /**
47     * Pump will be called by the StreamPumper to process the incoming data
48     */
49    abstract public static class Pump {
50        abstract void register(StreamPumper d);
51    }
52
53    /**
54     * OutputStream -> Pump adapter
55     */
56    final public static class StreamPump extends Pump {
57        private final OutputStream out;
58        public StreamPump(OutputStream out) {
59            this.out = out;
60        }
61
62        @Override
63        void register(StreamPumper sp) {
64            sp.addOutputStream(out);
65        }
66    }
67
68    /**
69     * Used to process the incoming data line-by-line
70     */
71    abstract public static class LinePump extends Pump {
72        @Override
73        final void register(StreamPumper sp) {
74            sp.addLineProcessor(this);
75        }
76
77        abstract protected void processLine(String line);
78    }
79
80    private final InputStream in;
81    private final Set<OutputStream> outStreams = new HashSet<>();
82    private final Set<LinePump> linePumps = new HashSet<>();
83
84    private final AtomicBoolean processing = new AtomicBoolean(false);
85    private final FutureTask<Void> processingTask = new FutureTask<>(this, null);
86
87    public StreamPumper(InputStream in) {
88        this.in = in;
89    }
90
91    /**
92     * Create a StreamPumper that reads from in and writes to out.
93     *
94     * @param in
95     *            The stream to read from.
96     * @param out
97     *            The stream to write to.
98     */
99    public StreamPumper(InputStream in, OutputStream out) {
100        this(in);
101        this.addOutputStream(out);
102    }
103
104    /**
105     * Implements Thread.run(). Continuously read from {@code in} and write to
106     * {@code out} until {@code in} has reached end of stream. Abort on
107     * interruption. Abort on IOExceptions.
108     */
109    @Override
110    public void run() {
111        try (BufferedInputStream is = new BufferedInputStream(in)) {
112            ByteArrayOutputStream lineBos = new ByteArrayOutputStream();
113            byte[] buf = new byte[BUF_SIZE];
114            int len = 0;
115            int linelen = 0;
116
117            while ((len = is.read(buf)) > 0 && !Thread.interrupted()) {
118                for(OutputStream out : outStreams) {
119                    out.write(buf, 0, len);
120                }
121                if (!linePumps.isEmpty()) {
122                    int i = 0;
123                    int lastcrlf = -1;
124                    while (i < len) {
125                        if (buf[i] == '\n' || buf[i] == '\r') {
126                            int bufLinelen = i - lastcrlf - 1;
127                            if (bufLinelen > 0) {
128                                lineBos.write(buf, lastcrlf + 1, bufLinelen);
129                            }
130                            linelen += bufLinelen;
131
132                            if (linelen > 0) {
133                                lineBos.flush();
134                                final String line = lineBos.toString();
135                                linePumps.stream().forEach((lp) -> {
136                                    lp.processLine(line);
137                                });
138                                lineBos.reset();
139                                linelen = 0;
140                            }
141                            lastcrlf = i;
142                        }
143
144                        i++;
145                    }
146                    if (lastcrlf == -1) {
147                        lineBos.write(buf, 0, len);
148                        linelen += len;
149                    } else if (lastcrlf < len - 1) {
150                        lineBos.write(buf, lastcrlf + 1, len - lastcrlf - 1);
151                        linelen += len - lastcrlf - 1;
152                    }
153                }
154            }
155
156        } catch (IOException e) {
157            e.printStackTrace();
158        } finally {
159            for(OutputStream out : outStreams) {
160                try {
161                    out.flush();
162                } catch (IOException e) {}
163            }
164            try {
165                in.close();
166            } catch (IOException e) {}
167        }
168    }
169
170    final void addOutputStream(OutputStream out) {
171        outStreams.add(out);
172    }
173
174    final void addLineProcessor(LinePump lp) {
175        linePumps.add(lp);
176    }
177
178    final public StreamPumper addPump(Pump ... pump) {
179        if (processing.get()) {
180            throw new IllegalStateException("Can not modify pumper while " +
181                                            "processing is in progress");
182        }
183        for(Pump p : pump) {
184            p.register(this);
185        }
186        return this;
187    }
188
189    final public Future<Void> process() {
190        if (!processing.compareAndSet(false, true)) {
191            throw new IllegalStateException("Can not re-run the processing");
192        }
193        Thread t = new Thread(new Runnable() {
194            @Override
195            public void run() {
196                processingTask.run();
197            }
198        });
199        t.setDaemon(true);
200        t.start();
201
202        return processingTask;
203    }
204}
205