1/*
2 * Copyright (c) 2008, 2014, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package sun.nio.fs;
27
28import java.nio.file.*;
29import java.security.AccessController;
30import java.security.PrivilegedAction;
31import java.io.IOException;
32import java.util.*;
33
34/**
35 * Base implementation of background poller thread used in watch service
36 * implementations. A poller thread waits on events from the file system and
37 * also services "requests" from clients to register for new events or cancel
38 * existing registrations.
39 */
40
41abstract class AbstractPoller implements Runnable {
42
43    // list of requests pending to the poller thread
44    private final LinkedList<Request> requestList;
45
46    // set to true when shutdown
47    private boolean shutdown;
48
49    protected AbstractPoller() {
50        this.requestList = new LinkedList<>();
51        this.shutdown = false;
52    }
53
54    /**
55     * Starts the poller thread
56     */
57    public void start() {
58        final Runnable thisRunnable = this;
59        AccessController.doPrivileged(new PrivilegedAction<>() {
60            @Override
61            public Object run() {
62                Thread thr = new Thread(null,
63                                        thisRunnable,
64                                        "FileSystemWatchService",
65                                        0,
66                                        false);
67                thr.setDaemon(true);
68                thr.start();
69                return null;
70            }
71         });
72    }
73
74    /**
75     * Wakeup poller thread so that it can service pending requests
76     */
77    abstract void wakeup() throws IOException;
78
79    /**
80     * Executed by poller thread to register directory for changes
81     */
82    abstract Object implRegister(Path path,
83                                 Set<? extends WatchEvent.Kind<?>> events,
84                                 WatchEvent.Modifier... modifiers);
85
86    /**
87     * Executed by poller thread to cancel key
88     */
89    abstract void implCancelKey(WatchKey key);
90
91    /**
92     * Executed by poller thread to shutdown and cancel all keys
93     */
94    abstract void implCloseAll();
95
96    /**
97     * Requests, and waits on, poller thread to register given file.
98     */
99    final WatchKey register(Path dir,
100                            WatchEvent.Kind<?>[] events,
101                            WatchEvent.Modifier... modifiers)
102        throws IOException
103    {
104        // validate arguments before request to poller
105        if (dir == null)
106            throw new NullPointerException();
107        Set<WatchEvent.Kind<?>> eventSet = new HashSet<>(events.length);
108        for (WatchEvent.Kind<?> event: events) {
109            // standard events
110            if (event == StandardWatchEventKinds.ENTRY_CREATE ||
111                event == StandardWatchEventKinds.ENTRY_MODIFY ||
112                event == StandardWatchEventKinds.ENTRY_DELETE)
113            {
114                eventSet.add(event);
115                continue;
116            }
117
118            // OVERFLOW is ignored
119            if (event == StandardWatchEventKinds.OVERFLOW)
120                continue;
121
122            // null/unsupported
123            if (event == null)
124                throw new NullPointerException("An element in event set is 'null'");
125            throw new UnsupportedOperationException(event.name());
126        }
127        if (eventSet.isEmpty())
128            throw new IllegalArgumentException("No events to register");
129        return (WatchKey)invoke(RequestType.REGISTER, dir, eventSet, modifiers);
130    }
131
132    /**
133     * Cancels, and waits on, poller thread to cancel given key.
134     */
135    final void cancel(WatchKey key) {
136        try {
137            invoke(RequestType.CANCEL, key);
138        } catch (IOException x) {
139            // should not happen
140            throw new AssertionError(x.getMessage());
141        }
142    }
143
144    /**
145     * Shutdown poller thread
146     */
147    final void close() throws IOException {
148        invoke(RequestType.CLOSE);
149    }
150
151    /**
152     * Types of request that the poller thread must handle
153     */
154    private static enum RequestType {
155        REGISTER,
156        CANCEL,
157        CLOSE;
158    }
159
160    /**
161     * Encapsulates a request (command) to the poller thread.
162     */
163    private static class Request {
164        private final RequestType type;
165        private final Object[] params;
166
167        private boolean completed = false;
168        private Object result = null;
169
170        Request(RequestType type, Object... params) {
171            this.type = type;
172            this.params = params;
173        }
174
175        RequestType type() {
176            return type;
177        }
178
179        Object[] parameters() {
180            return params;
181        }
182
183        void release(Object result) {
184            synchronized (this) {
185                this.completed = true;
186                this.result = result;
187                notifyAll();
188            }
189        }
190
191        /**
192         * Await completion of the request. The return value is the result of
193         * the request.
194         */
195        Object awaitResult() {
196            boolean interrupted = false;
197            synchronized (this) {
198                while (!completed) {
199                    try {
200                        wait();
201                    } catch (InterruptedException x) {
202                        interrupted = true;
203                    }
204                }
205                if (interrupted)
206                    Thread.currentThread().interrupt();
207                return result;
208            }
209        }
210    }
211
212    /**
213     * Enqueues request to poller thread and waits for result
214     */
215    private Object invoke(RequestType type, Object... params) throws IOException {
216        // submit request
217        Request req = new Request(type, params);
218        synchronized (requestList) {
219            if (shutdown) {
220                throw new ClosedWatchServiceException();
221            }
222            requestList.add(req);
223
224            // wakeup thread
225            wakeup();
226        }
227
228        // wait for result
229        Object result = req.awaitResult();
230
231        if (result instanceof RuntimeException)
232            throw (RuntimeException)result;
233        if (result instanceof IOException )
234            throw (IOException)result;
235        return result;
236    }
237
238    /**
239     * Invoked by poller thread to process all pending requests
240     *
241     * @return  true if poller thread should shutdown
242     */
243    @SuppressWarnings("unchecked")
244    boolean processRequests() {
245        synchronized (requestList) {
246            Request req;
247            while ((req = requestList.poll()) != null) {
248                // if in process of shutdown then reject request
249                if (shutdown) {
250                    req.release(new ClosedWatchServiceException());
251                    continue;
252                }
253
254                switch (req.type()) {
255                    /**
256                     * Register directory
257                     */
258                    case REGISTER: {
259                        Object[] params = req.parameters();
260                        Path path = (Path)params[0];
261                        Set<? extends WatchEvent.Kind<?>> events =
262                            (Set<? extends WatchEvent.Kind<?>>)params[1];
263                        WatchEvent.Modifier[] modifiers =
264                            (WatchEvent.Modifier[])params[2];
265                        req.release(implRegister(path, events, modifiers));
266                        break;
267                    }
268                    /**
269                     * Cancel existing key
270                     */
271                    case CANCEL : {
272                        Object[] params = req.parameters();
273                        WatchKey key = (WatchKey)params[0];
274                        implCancelKey(key);
275                        req.release(null);
276                        break;
277                    }
278                    /**
279                     * Close watch service
280                     */
281                    case CLOSE: {
282                        implCloseAll();
283                        req.release(null);
284                        shutdown = true;
285                        break;
286                    }
287
288                    default:
289                        req.release(new IOException("request not recognized"));
290                }
291            }
292        }
293        return shutdown;
294    }
295}
296