1/*
2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package sun.nio.ch;
27
28import java.nio.channels.spi.AsynchronousChannelProvider;
29import java.io.IOException;
30import java.util.concurrent.ArrayBlockingQueue;
31import java.util.concurrent.RejectedExecutionException;
32import java.util.concurrent.atomic.AtomicInteger;
33import static sun.nio.ch.EPoll.*;
34
35/**
36 * AsynchronousChannelGroup implementation based on the Linux epoll facility.
37 */
38
39final class EPollPort
40    extends Port
41{
42    // maximum number of events to poll at a time
43    private static final int MAX_EPOLL_EVENTS = 512;
44
45    // errors
46    private static final int ENOENT     = 2;
47
48    // epoll file descriptor
49    private final int epfd;
50
51    // true if epoll closed
52    private boolean closed;
53
54    // socket pair used for wakeup
55    private final int sp[];
56
57    // number of wakeups pending
58    private final AtomicInteger wakeupCount = new AtomicInteger();
59
60    // address of the poll array passed to epoll_wait
61    private final long address;
62
63    // encapsulates an event for a channel
64    static class Event {
65        final PollableChannel channel;
66        final int events;
67
68        Event(PollableChannel channel, int events) {
69            this.channel = channel;
70            this.events = events;
71        }
72
73        PollableChannel channel()   { return channel; }
74        int events()                { return events; }
75    }
76
77    // queue of events for cases that a polling thread dequeues more than one
78    // event
79    private final ArrayBlockingQueue<Event> queue;
80    private final Event NEED_TO_POLL = new Event(null, 0);
81    private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
82
83    EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
84        throws IOException
85    {
86        super(provider, pool);
87
88        // open epoll
89        this.epfd = epollCreate();
90
91        // create socket pair for wakeup mechanism
92        int[] sv = new int[2];
93        try {
94            socketpair(sv);
95            // register one end with epoll
96            epollCtl(epfd, EPOLL_CTL_ADD, sv[0], Net.POLLIN);
97        } catch (IOException x) {
98            close0(epfd);
99            throw x;
100        }
101        this.sp = sv;
102
103        // allocate the poll array
104        this.address = allocatePollArray(MAX_EPOLL_EVENTS);
105
106        // create the queue and offer the special event to ensure that the first
107        // threads polls
108        this.queue = new ArrayBlockingQueue<>(MAX_EPOLL_EVENTS);
109        this.queue.offer(NEED_TO_POLL);
110    }
111
112    EPollPort start() {
113        startThreads(new EventHandlerTask());
114        return this;
115    }
116
117    /**
118     * Release all resources
119     */
120    private void implClose() {
121        synchronized (this) {
122            if (closed)
123                return;
124            closed = true;
125        }
126        freePollArray(address);
127        close0(sp[0]);
128        close0(sp[1]);
129        close0(epfd);
130    }
131
132    private void wakeup() {
133        if (wakeupCount.incrementAndGet() == 1) {
134            // write byte to socketpair to force wakeup
135            try {
136                interrupt(sp[1]);
137            } catch (IOException x) {
138                throw new AssertionError(x);
139            }
140        }
141    }
142
143    @Override
144    void executeOnHandlerTask(Runnable task) {
145        synchronized (this) {
146            if (closed)
147                throw new RejectedExecutionException();
148            offerTask(task);
149            wakeup();
150        }
151    }
152
153    @Override
154    void shutdownHandlerTasks() {
155        /*
156         * If no tasks are running then just release resources; otherwise
157         * write to the one end of the socketpair to wakeup any polling threads.
158         */
159        int nThreads = threadCount();
160        if (nThreads == 0) {
161            implClose();
162        } else {
163            // send interrupt to each thread
164            while (nThreads-- > 0) {
165                wakeup();
166            }
167        }
168    }
169
170    // invoke by clients to register a file descriptor
171    @Override
172    void startPoll(int fd, int events) {
173        // update events (or add to epoll on first usage)
174        int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
175        if (err == ENOENT)
176            err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
177        if (err != 0)
178            throw new AssertionError();     // should not happen
179    }
180
181    /*
182     * Task to process events from epoll and dispatch to the channel's
183     * onEvent handler.
184     *
185     * Events are retreived from epoll in batch and offered to a BlockingQueue
186     * where they are consumed by handler threads. A special "NEED_TO_POLL"
187     * event is used to signal one consumer to re-poll when all events have
188     * been consumed.
189     */
190    private class EventHandlerTask implements Runnable {
191        private Event poll() throws IOException {
192            try {
193                for (;;) {
194                    int n = epollWait(epfd, address, MAX_EPOLL_EVENTS);
195                    /*
196                     * 'n' events have been read. Here we map them to their
197                     * corresponding channel in batch and queue n-1 so that
198                     * they can be handled by other handler threads. The last
199                     * event is handled by this thread (and so is not queued).
200                     */
201                    fdToChannelLock.readLock().lock();
202                    try {
203                        while (n-- > 0) {
204                            long eventAddress = getEvent(address, n);
205                            int fd = getDescriptor(eventAddress);
206
207                            // wakeup
208                            if (fd == sp[0]) {
209                                if (wakeupCount.decrementAndGet() == 0) {
210                                    // no more wakeups so drain pipe
211                                    drain1(sp[0]);
212                                }
213
214                                // queue special event if there are more events
215                                // to handle.
216                                if (n > 0) {
217                                    queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
218                                    continue;
219                                }
220                                return EXECUTE_TASK_OR_SHUTDOWN;
221                            }
222
223                            PollableChannel channel = fdToChannel.get(fd);
224                            if (channel != null) {
225                                int events = getEvents(eventAddress);
226                                Event ev = new Event(channel, events);
227
228                                // n-1 events are queued; This thread handles
229                                // the last one except for the wakeup
230                                if (n > 0) {
231                                    queue.offer(ev);
232                                } else {
233                                    return ev;
234                                }
235                            }
236                        }
237                    } finally {
238                        fdToChannelLock.readLock().unlock();
239                    }
240                }
241            } finally {
242                // to ensure that some thread will poll when all events have
243                // been consumed
244                queue.offer(NEED_TO_POLL);
245            }
246        }
247
248        public void run() {
249            Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
250                Invoker.getGroupAndInvokeCount();
251            final boolean isPooledThread = (myGroupAndInvokeCount != null);
252            boolean replaceMe = false;
253            Event ev;
254            try {
255                for (;;) {
256                    // reset invoke count
257                    if (isPooledThread)
258                        myGroupAndInvokeCount.resetInvokeCount();
259
260                    try {
261                        replaceMe = false;
262                        ev = queue.take();
263
264                        // no events and this thread has been "selected" to
265                        // poll for more.
266                        if (ev == NEED_TO_POLL) {
267                            try {
268                                ev = poll();
269                            } catch (IOException x) {
270                                x.printStackTrace();
271                                return;
272                            }
273                        }
274                    } catch (InterruptedException x) {
275                        continue;
276                    }
277
278                    // handle wakeup to execute task or shutdown
279                    if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
280                        Runnable task = pollTask();
281                        if (task == null) {
282                            // shutdown request
283                            return;
284                        }
285                        // run task (may throw error/exception)
286                        replaceMe = true;
287                        task.run();
288                        continue;
289                    }
290
291                    // process event
292                    try {
293                        ev.channel().onEvent(ev.events(), isPooledThread);
294                    } catch (Error x) {
295                        replaceMe = true; throw x;
296                    } catch (RuntimeException x) {
297                        replaceMe = true; throw x;
298                    }
299                }
300            } finally {
301                // last handler to exit when shutdown releases resources
302                int remaining = threadExit(this, replaceMe);
303                if (remaining == 0 && isShutdown()) {
304                    implClose();
305                }
306            }
307        }
308    }
309
310    // -- Native methods --
311
312    private static native void socketpair(int[] sv) throws IOException;
313
314    private static native void interrupt(int fd) throws IOException;
315
316    private static native void drain1(int fd) throws IOException;
317
318    private static native void close0(int fd);
319
320    static {
321        IOUtil.load();
322    }
323}
324