1/*
2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package sun.nio.ch;
27
28import java.nio.channels.spi.AsynchronousChannelProvider;
29import java.io.IOException;
30import java.util.concurrent.ArrayBlockingQueue;
31import java.util.concurrent.RejectedExecutionException;
32import java.util.concurrent.atomic.AtomicInteger;
33import static sun.nio.ch.KQueue.*;
34
35/**
36 * AsynchronousChannelGroup implementation based on the BSD kqueue facility.
37 */
38
39final class KQueuePort
40    extends Port
41{
42    // maximum number of events to poll at a time
43    private static final int MAX_KEVENTS_TO_POLL = 512;
44
45    // kqueue file descriptor
46    private final int kqfd;
47
48    // true if kqueue closed
49    private boolean closed;
50
51    // socket pair used for wakeup
52    private final int sp[];
53
54    // number of wakeups pending
55    private final AtomicInteger wakeupCount = new AtomicInteger();
56
57    // address of the poll array passed to kqueue_wait
58    private final long address;
59
60    // encapsulates an event for a channel
61    static class Event {
62        final PollableChannel channel;
63        final int events;
64
65        Event(PollableChannel channel, int events) {
66            this.channel = channel;
67            this.events = events;
68        }
69
70        PollableChannel channel()   { return channel; }
71        int events()                { return events; }
72    }
73
74    // queue of events for cases that a polling thread dequeues more than one
75    // event
76    private final ArrayBlockingQueue<Event> queue;
77    private final Event NEED_TO_POLL = new Event(null, 0);
78    private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
79
80    KQueuePort(AsynchronousChannelProvider provider, ThreadPool pool)
81        throws IOException
82    {
83        super(provider, pool);
84
85        // open kqueue
86        this.kqfd = kqueue();
87
88        // create socket pair for wakeup mechanism
89        int[] sv = new int[2];
90        try {
91            socketpair(sv);
92
93            // register one end with kqueue
94            keventRegister(kqfd, sv[0], EVFILT_READ, EV_ADD);
95        } catch (IOException x) {
96            close0(kqfd);
97            throw x;
98        }
99        this.sp = sv;
100
101        // allocate the poll array
102        this.address = allocatePollArray(MAX_KEVENTS_TO_POLL);
103
104        // create the queue and offer the special event to ensure that the first
105        // threads polls
106        this.queue = new ArrayBlockingQueue<Event>(MAX_KEVENTS_TO_POLL);
107        this.queue.offer(NEED_TO_POLL);
108    }
109
110    KQueuePort start() {
111        startThreads(new EventHandlerTask());
112        return this;
113    }
114
115    /**
116     * Release all resources
117     */
118    private void implClose() {
119        synchronized (this) {
120            if (closed)
121                return;
122            closed = true;
123        }
124        freePollArray(address);
125        close0(sp[0]);
126        close0(sp[1]);
127        close0(kqfd);
128    }
129
130    private void wakeup() {
131        if (wakeupCount.incrementAndGet() == 1) {
132            // write byte to socketpair to force wakeup
133            try {
134                interrupt(sp[1]);
135            } catch (IOException x) {
136                throw new AssertionError(x);
137            }
138        }
139    }
140
141    @Override
142    void executeOnHandlerTask(Runnable task) {
143        synchronized (this) {
144            if (closed)
145                throw new RejectedExecutionException();
146            offerTask(task);
147            wakeup();
148        }
149    }
150
151    @Override
152    void shutdownHandlerTasks() {
153        /*
154         * If no tasks are running then just release resources; otherwise
155         * write to the one end of the socketpair to wakeup any polling threads.
156         */
157        int nThreads = threadCount();
158        if (nThreads == 0) {
159            implClose();
160        } else {
161            // send interrupt to each thread
162            while (nThreads-- > 0) {
163                wakeup();
164            }
165        }
166    }
167
168    // invoked by clients to register a file descriptor
169    @Override
170    void startPoll(int fd, int events) {
171        // We use a separate filter for read and write events.
172        // TBD: Measure cost of EV_ONESHOT vs. EV_CLEAR, either will do here.
173        int err = 0;
174        int flags = (EV_ADD|EV_ONESHOT);
175        if ((events & Net.POLLIN) > 0)
176            err = keventRegister(kqfd, fd, EVFILT_READ, flags);
177        if (err == 0 && (events & Net.POLLOUT) > 0)
178            err = keventRegister(kqfd, fd, EVFILT_WRITE, flags);
179        if (err != 0)
180            throw new InternalError("kevent failed: " + err);  // should not happen
181    }
182
183    /*
184     * Task to process events from kqueue and dispatch to the channel's
185     * onEvent handler.
186     *
187     * Events are retreived from kqueue in batch and offered to a BlockingQueue
188     * where they are consumed by handler threads. A special "NEED_TO_POLL"
189     * event is used to signal one consumer to re-poll when all events have
190     * been consumed.
191     */
192    private class EventHandlerTask implements Runnable {
193        private Event poll() throws IOException {
194            try {
195                for (;;) {
196                    int n = keventPoll(kqfd, address, MAX_KEVENTS_TO_POLL);
197                    /*
198                     * 'n' events have been read. Here we map them to their
199                     * corresponding channel in batch and queue n-1 so that
200                     * they can be handled by other handler threads. The last
201                     * event is handled by this thread (and so is not queued).
202                     */
203                    fdToChannelLock.readLock().lock();
204                    try {
205                        while (n-- > 0) {
206                            long keventAddress = getEvent(address, n);
207                            int fd = getDescriptor(keventAddress);
208
209                            // wakeup
210                            if (fd == sp[0]) {
211                                if (wakeupCount.decrementAndGet() == 0) {
212                                    // no more wakeups so drain pipe
213                                    drain1(sp[0]);
214                                }
215
216                                // queue special event if there are more events
217                                // to handle.
218                                if (n > 0) {
219                                    queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
220                                    continue;
221                                }
222                                return EXECUTE_TASK_OR_SHUTDOWN;
223                            }
224
225                            PollableChannel channel = fdToChannel.get(fd);
226                            if (channel != null) {
227                                int filter = getFilter(keventAddress);
228                                int events = 0;
229                                if (filter == EVFILT_READ)
230                                    events = Net.POLLIN;
231                                else if (filter == EVFILT_WRITE)
232                                    events = Net.POLLOUT;
233
234                                Event ev = new Event(channel, events);
235
236                                // n-1 events are queued; This thread handles
237                                // the last one except for the wakeup
238                                if (n > 0) {
239                                    queue.offer(ev);
240                                } else {
241                                    return ev;
242                                }
243                            }
244                        }
245                    } finally {
246                        fdToChannelLock.readLock().unlock();
247                    }
248                }
249            } finally {
250                // to ensure that some thread will poll when all events have
251                // been consumed
252                queue.offer(NEED_TO_POLL);
253            }
254        }
255
256        public void run() {
257            Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
258                Invoker.getGroupAndInvokeCount();
259            final boolean isPooledThread = (myGroupAndInvokeCount != null);
260            boolean replaceMe = false;
261            Event ev;
262            try {
263                for (;;) {
264                    // reset invoke count
265                    if (isPooledThread)
266                        myGroupAndInvokeCount.resetInvokeCount();
267
268                    try {
269                        replaceMe = false;
270                        ev = queue.take();
271
272                        // no events and this thread has been "selected" to
273                        // poll for more.
274                        if (ev == NEED_TO_POLL) {
275                            try {
276                                ev = poll();
277                            } catch (IOException x) {
278                                x.printStackTrace();
279                                return;
280                            }
281                        }
282                    } catch (InterruptedException x) {
283                        continue;
284                    }
285
286                    // handle wakeup to execute task or shutdown
287                    if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
288                        Runnable task = pollTask();
289                        if (task == null) {
290                            // shutdown request
291                            return;
292                        }
293                        // run task (may throw error/exception)
294                        replaceMe = true;
295                        task.run();
296                        continue;
297                    }
298
299                    // process event
300                    try {
301                        ev.channel().onEvent(ev.events(), isPooledThread);
302                    } catch (Error x) {
303                        replaceMe = true; throw x;
304                    } catch (RuntimeException x) {
305                        replaceMe = true; throw x;
306                    }
307                }
308            } finally {
309                // last handler to exit when shutdown releases resources
310                int remaining = threadExit(this, replaceMe);
311                if (remaining == 0 && isShutdown()) {
312                    implClose();
313                }
314            }
315        }
316    }
317
318    // -- Native methods --
319
320    private static native void socketpair(int[] sv) throws IOException;
321
322    private static native void interrupt(int fd) throws IOException;
323
324    private static native void drain1(int fd) throws IOException;
325
326    private static native void close0(int fd);
327
328    static {
329        IOUtil.load();
330    }
331}
332