StreamPumper.java revision 2224:2a8815d86b93
1/*
2 * Copyright (c) 2013, 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This code is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 * version 2 for more details (a copy is included in the LICENSE file that
13 * accompanied this code).
14 *
15 * You should have received a copy of the GNU General Public License version
16 * 2 along with this work; if not, write to the Free Software Foundation,
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18 *
19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20 * or visit www.oracle.com if you need additional information or have any
21 * questions.
22 */
23
24package jdk.test.lib.process;
25
26import java.io.BufferedInputStream;
27import java.io.ByteArrayOutputStream;
28import java.io.OutputStream;
29import java.io.InputStream;
30import java.io.IOException;
31import java.util.HashSet;
32import java.util.Set;
33import java.util.concurrent.Future;
34import java.util.concurrent.FutureTask;
35import java.util.concurrent.atomic.AtomicBoolean;
36
37public final class StreamPumper implements Runnable {
38
39    private static final int BUF_SIZE = 256;
40
41    /**
42     * Pump will be called by the StreamPumper to process the incoming data
43     */
44    abstract public static class Pump {
45        abstract void register(StreamPumper d);
46    }
47
48    /**
49     * OutputStream -> Pump adapter
50     */
51    final public static class StreamPump extends Pump {
52        private final OutputStream out;
53        public StreamPump(OutputStream out) {
54            this.out = out;
55        }
56
57        @Override
58        void register(StreamPumper sp) {
59            sp.addOutputStream(out);
60        }
61    }
62
63    /**
64     * Used to process the incoming data line-by-line
65     */
66    abstract public static class LinePump extends Pump {
67        @Override
68        final void register(StreamPumper sp) {
69            sp.addLineProcessor(this);
70        }
71
72        abstract protected void processLine(String line);
73    }
74
75    private final InputStream in;
76    private final Set<OutputStream> outStreams = new HashSet<>();
77    private final Set<LinePump> linePumps = new HashSet<>();
78
79    private final AtomicBoolean processing = new AtomicBoolean(false);
80    private final FutureTask<Void> processingTask = new FutureTask<>(this, null);
81
82    public StreamPumper(InputStream in) {
83        this.in = in;
84    }
85
86    /**
87     * Create a StreamPumper that reads from in and writes to out.
88     *
89     * @param in The stream to read from.
90     * @param out The stream to write to.
91     */
92    public StreamPumper(InputStream in, OutputStream out) {
93        this(in);
94        this.addOutputStream(out);
95    }
96
97    /**
98     * Implements Thread.run(). Continuously read from {@code in} and write to
99     * {@code out} until {@code in} has reached end of stream. Abort on
100     * interruption. Abort on IOExceptions.
101     */
102    @Override
103    public void run() {
104        try (BufferedInputStream is = new BufferedInputStream(in)) {
105            ByteArrayOutputStream lineBos = new ByteArrayOutputStream();
106            byte[] buf = new byte[BUF_SIZE];
107            int len = 0;
108            int linelen = 0;
109
110            while ((len = is.read(buf)) > 0 && !Thread.interrupted()) {
111                for(OutputStream out : outStreams) {
112                    out.write(buf, 0, len);
113                }
114                if (!linePumps.isEmpty()) {
115                    int i = 0;
116                    int lastcrlf = -1;
117                    while (i < len) {
118                        if (buf[i] == '\n' || buf[i] == '\r') {
119                            int bufLinelen = i - lastcrlf - 1;
120                            if (bufLinelen > 0) {
121                                lineBos.write(buf, lastcrlf + 1, bufLinelen);
122                            }
123                            linelen += bufLinelen;
124
125                            if (linelen > 0) {
126                                lineBos.flush();
127                                final String line = lineBos.toString();
128                                linePumps.stream().forEach((lp) -> {
129                                    lp.processLine(line);
130                                });
131                                lineBos.reset();
132                                linelen = 0;
133                            }
134                            lastcrlf = i;
135                        }
136
137                        i++;
138                    }
139                    if (lastcrlf == -1) {
140                        lineBos.write(buf, 0, len);
141                        linelen += len;
142                    } else if (lastcrlf < len - 1) {
143                        lineBos.write(buf, lastcrlf + 1, len - lastcrlf - 1);
144                        linelen += len - lastcrlf - 1;
145                    }
146                }
147            }
148
149        } catch (IOException e) {
150            e.printStackTrace();
151        } finally {
152            for(OutputStream out : outStreams) {
153                try {
154                    out.flush();
155                } catch (IOException e) {}
156            }
157            try {
158                in.close();
159            } catch (IOException e) {}
160        }
161    }
162
163    final void addOutputStream(OutputStream out) {
164        outStreams.add(out);
165    }
166
167    final void addLineProcessor(LinePump lp) {
168        linePumps.add(lp);
169    }
170
171    final public StreamPumper addPump(Pump ... pump) {
172        if (processing.get()) {
173            throw new IllegalStateException("Can not modify pumper while " +
174                                            "processing is in progress");
175        }
176        for(Pump p : pump) {
177            p.register(this);
178        }
179        return this;
180    }
181
182    final public Future<Void> process() {
183        if (!processing.compareAndSet(false, true)) {
184            throw new IllegalStateException("Can not re-run the processing");
185        }
186        Thread t = new Thread(new Runnable() {
187            @Override
188            public void run() {
189                processingTask.run();
190            }
191        });
192        t.setDaemon(true);
193        t.start();
194
195        return processingTask;
196    }
197}
198