1/*
2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
3 * Copyright (c) 2012 SAP SE. All rights reserved.
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This code is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License version 2 only, as
8 * published by the Free Software Foundation.  Oracle designates this
9 * particular file as subject to the "Classpath" exception as provided
10 * by Oracle in the LICENSE file that accompanied this code.
11 *
12 * This code is distributed in the hope that it will be useful, but WITHOUT
13 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
15 * version 2 for more details (a copy is included in the LICENSE file that
16 * accompanied this code).
17 *
18 * You should have received a copy of the GNU General Public License version
19 * 2 along with this work; if not, write to the Free Software Foundation,
20 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
21 *
22 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
23 * or visit www.oracle.com if you need additional information or have any
24 * questions.
25 */
26
27package sun.nio.ch;
28
29import java.nio.channels.spi.AsynchronousChannelProvider;
30import java.io.IOException;
31import java.util.HashSet;
32import java.util.Iterator;
33import java.util.concurrent.ArrayBlockingQueue;
34import java.util.concurrent.RejectedExecutionException;
35import java.util.concurrent.atomic.AtomicInteger;
36import java.util.concurrent.locks.ReentrantLock;
37import jdk.internal.misc.Unsafe;
38
39/**
40 * AsynchronousChannelGroup implementation based on the AIX pollset framework.
41 */
42final class AixPollPort
43    extends Port
44{
45    private static final Unsafe unsafe = Unsafe.getUnsafe();
46
47    static {
48        IOUtil.load();
49        init();
50    }
51
52    /**
53     * struct pollfd {
54     *     int fd;
55     *     short events;
56     *     short revents;
57     * }
58     */
59    private static final int SIZEOF_POLLFD    = eventSize();
60    private static final int OFFSETOF_EVENTS  = eventsOffset();
61    private static final int OFFSETOF_REVENTS = reventsOffset();
62    private static final int OFFSETOF_FD      = fdOffset();
63
64    // opcodes
65    private static final int PS_ADD     = 0x0;
66    private static final int PS_MOD     = 0x1;
67    private static final int PS_DELETE  = 0x2;
68
69    // maximum number of events to poll at a time
70    private static final int MAX_POLL_EVENTS = 512;
71
72    // pollset ID
73    private final int pollset;
74
75    // true if port is closed
76    private boolean closed;
77
78    // socket pair used for wakeup
79    private final int sp[];
80
81    // socket pair used to indicate pending pollsetCtl calls
82    // Background info: pollsetCtl blocks when another thread is in a pollsetPoll call.
83    private final int ctlSp[];
84
85    // number of wakeups pending
86    private final AtomicInteger wakeupCount = new AtomicInteger();
87
88    // address of the poll array passed to pollset_poll
89    private final long address;
90
91    // encapsulates an event for a channel
92    static class Event {
93        final PollableChannel channel;
94        final int events;
95
96        Event(PollableChannel channel, int events) {
97            this.channel = channel;
98            this.events = events;
99        }
100
101        PollableChannel channel()   { return channel; }
102        int events()                { return events; }
103    }
104
105    // queue of events for cases that a polling thread dequeues more than one
106    // event
107    private final ArrayBlockingQueue<Event> queue;
108    private final Event NEED_TO_POLL = new Event(null, 0);
109    private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
110    private final Event CONTINUE_AFTER_CTL_EVENT = new Event(null, 0);
111
112    // encapsulates a pollset control event for a file descriptor
113    static class ControlEvent {
114        final int fd;
115        final int events;
116        final boolean removeOnly;
117        int error = 0;
118
119        ControlEvent(int fd, int events, boolean removeOnly) {
120            this.fd = fd;
121            this.events = events;
122            this.removeOnly = removeOnly;
123        }
124
125        int fd()                 { return fd; }
126        int events()             { return events; }
127        boolean removeOnly()     { return removeOnly; }
128        int error()              { return error; }
129        void setError(int error) { this.error = error; }
130    }
131
132    // queue of control events that need to be processed
133    // (this object is also used for synchronization)
134    private final HashSet<ControlEvent> controlQueue = new HashSet<ControlEvent>();
135
136    // lock used to check whether a poll operation is ongoing
137    private final ReentrantLock controlLock = new ReentrantLock();
138
139    AixPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
140        throws IOException
141    {
142        super(provider, pool);
143
144        // open pollset
145        this.pollset = pollsetCreate();
146
147        // create socket pair for wakeup mechanism
148        int[] sv = new int[2];
149        try {
150            socketpair(sv);
151            // register one end with pollset
152            pollsetCtl(pollset, PS_ADD, sv[0], Net.POLLIN);
153        } catch (IOException x) {
154            pollsetDestroy(pollset);
155            throw x;
156        }
157        this.sp = sv;
158
159        // create socket pair for pollset control mechanism
160        sv = new int[2];
161        try {
162            socketpair(sv);
163            // register one end with pollset
164            pollsetCtl(pollset, PS_ADD, sv[0], Net.POLLIN);
165        } catch (IOException x) {
166            pollsetDestroy(pollset);
167            throw x;
168        }
169        this.ctlSp = sv;
170
171        // allocate the poll array
172        this.address = allocatePollArray(MAX_POLL_EVENTS);
173
174        // create the queue and offer the special event to ensure that the first
175        // threads polls
176        this.queue = new ArrayBlockingQueue<Event>(MAX_POLL_EVENTS);
177        this.queue.offer(NEED_TO_POLL);
178    }
179
180    AixPollPort start() {
181        startThreads(new EventHandlerTask());
182        return this;
183    }
184
185    /**
186     * Release all resources
187     */
188    private void implClose() {
189        synchronized (this) {
190            if (closed)
191                return;
192            closed = true;
193        }
194        freePollArray(address);
195        close0(sp[0]);
196        close0(sp[1]);
197        close0(ctlSp[0]);
198        close0(ctlSp[1]);
199        pollsetDestroy(pollset);
200    }
201
202    private void wakeup() {
203        if (wakeupCount.incrementAndGet() == 1) {
204            // write byte to socketpair to force wakeup
205            try {
206                interrupt(sp[1]);
207            } catch (IOException x) {
208                throw new AssertionError(x);
209            }
210        }
211    }
212
213    @Override
214    void executeOnHandlerTask(Runnable task) {
215        synchronized (this) {
216            if (closed)
217                throw new RejectedExecutionException();
218            offerTask(task);
219            wakeup();
220        }
221    }
222
223    @Override
224    void shutdownHandlerTasks() {
225        /*
226         * If no tasks are running then just release resources; otherwise
227         * write to the one end of the socketpair to wakeup any polling threads.
228         */
229        int nThreads = threadCount();
230        if (nThreads == 0) {
231            implClose();
232        } else {
233            // send interrupt to each thread
234            while (nThreads-- > 0) {
235                wakeup();
236            }
237        }
238    }
239
240    // invoke by clients to register a file descriptor
241    @Override
242    void startPoll(int fd, int events) {
243        queueControlEvent(new ControlEvent(fd, events, false));
244    }
245
246    // Callback method for implementations that need special handling when fd is removed
247    @Override
248    protected void preUnregister(int fd) {
249        queueControlEvent(new ControlEvent(fd, 0, true));
250    }
251
252    // Add control event into queue and wait for completion.
253    // In case the control lock is free, this method also tries to apply the control change directly.
254    private void queueControlEvent(ControlEvent ev) {
255        // pollsetCtl blocks when a poll call is ongoing. This is very probable.
256        // Therefore we let the polling thread do the pollsetCtl call.
257        synchronized (controlQueue) {
258            controlQueue.add(ev);
259            // write byte to socketpair to force wakeup
260            try {
261                interrupt(ctlSp[1]);
262            } catch (IOException x) {
263                throw new AssertionError(x);
264            }
265            do {
266                // Directly empty queue if no poll call is ongoing.
267                if (controlLock.tryLock()) {
268                    try {
269                        processControlQueue();
270                    } finally {
271                        controlLock.unlock();
272                    }
273                } else {
274                    try {
275                        // Do not starve in case the polling thread returned before
276                        // we could write to ctlSp[1] but the polling thread did not
277                        // release the control lock until we checked. Therefore, use
278                        // a timed wait for the time being.
279                        controlQueue.wait(100);
280                    } catch (InterruptedException e) {
281                        // ignore exception and try again
282                    }
283                }
284            } while (controlQueue.contains(ev));
285        }
286        if (ev.error() != 0) {
287            throw new AssertionError();
288        }
289    }
290
291    // Process all events currently stored in the control queue.
292    private void processControlQueue() {
293        synchronized (controlQueue) {
294            // On Aix it is only possible to set the event
295            // bits on the first call of pollsetCtl. Later
296            // calls only add bits, but cannot remove them.
297            // Therefore, we always remove the file
298            // descriptor ignoring the error and then add it.
299            Iterator<ControlEvent> iter = controlQueue.iterator();
300            while (iter.hasNext()) {
301                ControlEvent ev = iter.next();
302                pollsetCtl(pollset, PS_DELETE, ev.fd(), 0);
303                if (!ev.removeOnly()) {
304                    ev.setError(pollsetCtl(pollset, PS_MOD, ev.fd(), ev.events()));
305                }
306                iter.remove();
307            }
308            controlQueue.notifyAll();
309        }
310    }
311
312    /*
313     * Task to process events from pollset and dispatch to the channel's
314     * onEvent handler.
315     *
316     * Events are retreived from pollset in batch and offered to a BlockingQueue
317     * where they are consumed by handler threads. A special "NEED_TO_POLL"
318     * event is used to signal one consumer to re-poll when all events have
319     * been consumed.
320     */
321    private class EventHandlerTask implements Runnable {
322        private Event poll() throws IOException {
323            try {
324                for (;;) {
325                    int n;
326                    controlLock.lock();
327                    try {
328                        n = pollsetPoll(pollset, address, MAX_POLL_EVENTS);
329                    } finally {
330                        controlLock.unlock();
331                    }
332                    /*
333                     * 'n' events have been read. Here we map them to their
334                     * corresponding channel in batch and queue n-1 so that
335                     * they can be handled by other handler threads. The last
336                     * event is handled by this thread (and so is not queued).
337                     */
338                    fdToChannelLock.readLock().lock();
339                    try {
340                        while (n-- > 0) {
341                            long eventAddress = getEvent(address, n);
342                            int fd = getDescriptor(eventAddress);
343
344                            // To emulate one shot semantic we need to remove
345                            // the file descriptor here.
346                            if (fd != sp[0] && fd != ctlSp[0]) {
347                                synchronized (controlQueue) {
348                                    pollsetCtl(pollset, PS_DELETE, fd, 0);
349                                }
350                            }
351
352                            // wakeup
353                            if (fd == sp[0]) {
354                                if (wakeupCount.decrementAndGet() == 0) {
355                                    // no more wakeups so drain pipe
356                                    drain1(sp[0]);
357                                }
358
359                                // queue special event if there are more events
360                                // to handle.
361                                if (n > 0) {
362                                    queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
363                                    continue;
364                                }
365                                return EXECUTE_TASK_OR_SHUTDOWN;
366                            }
367
368                            // wakeup to process control event
369                            if (fd == ctlSp[0]) {
370                                synchronized (controlQueue) {
371                                    drain1(ctlSp[0]);
372                                    processControlQueue();
373                                }
374                                if (n > 0) {
375                                    continue;
376                                }
377                                return CONTINUE_AFTER_CTL_EVENT;
378                            }
379
380                            PollableChannel channel = fdToChannel.get(fd);
381                            if (channel != null) {
382                                int events = getRevents(eventAddress);
383                                Event ev = new Event(channel, events);
384
385                                // n-1 events are queued; This thread handles
386                                // the last one except for the wakeup
387                                if (n > 0) {
388                                    queue.offer(ev);
389                                } else {
390                                    return ev;
391                                }
392                            }
393                        }
394                    } finally {
395                        fdToChannelLock.readLock().unlock();
396                    }
397                }
398            } finally {
399                // to ensure that some thread will poll when all events have
400                // been consumed
401                queue.offer(NEED_TO_POLL);
402            }
403        }
404
405        public void run() {
406            Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
407                Invoker.getGroupAndInvokeCount();
408            final boolean isPooledThread = (myGroupAndInvokeCount != null);
409            boolean replaceMe = false;
410            Event ev;
411            try {
412                for (;;) {
413                    // reset invoke count
414                    if (isPooledThread)
415                        myGroupAndInvokeCount.resetInvokeCount();
416
417                    try {
418                        replaceMe = false;
419                        ev = queue.take();
420
421                        // no events and this thread has been "selected" to
422                        // poll for more.
423                        if (ev == NEED_TO_POLL) {
424                            try {
425                                ev = poll();
426                            } catch (IOException x) {
427                                x.printStackTrace();
428                                return;
429                            }
430                        }
431                    } catch (InterruptedException x) {
432                        continue;
433                    }
434
435                    // contine after we processed a control event
436                    if (ev == CONTINUE_AFTER_CTL_EVENT) {
437                        continue;
438                    }
439
440                    // handle wakeup to execute task or shutdown
441                    if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
442                        Runnable task = pollTask();
443                        if (task == null) {
444                            // shutdown request
445                            return;
446                        }
447                        // run task (may throw error/exception)
448                        replaceMe = true;
449                        task.run();
450                        continue;
451                    }
452
453                    // process event
454                    try {
455                        ev.channel().onEvent(ev.events(), isPooledThread);
456                    } catch (Error x) {
457                        replaceMe = true; throw x;
458                    } catch (RuntimeException x) {
459                        replaceMe = true; throw x;
460                    }
461                }
462            } finally {
463                // last handler to exit when shutdown releases resources
464                int remaining = threadExit(this, replaceMe);
465                if (remaining == 0 && isShutdown()) {
466                    implClose();
467                }
468            }
469        }
470    }
471
472    /**
473     * Allocates a poll array to handle up to {@code count} events.
474     */
475    private static long allocatePollArray(int count) {
476        return unsafe.allocateMemory(count * SIZEOF_POLLFD);
477    }
478
479    /**
480     * Free a poll array
481     */
482    private static void freePollArray(long address) {
483        unsafe.freeMemory(address);
484    }
485
486    /**
487     * Returns event[i];
488     */
489    private static long getEvent(long address, int i) {
490        return address + (SIZEOF_POLLFD*i);
491    }
492
493    /**
494     * Returns event->fd
495     */
496    private static int getDescriptor(long eventAddress) {
497        return unsafe.getInt(eventAddress + OFFSETOF_FD);
498    }
499
500    /**
501     * Returns event->events
502     */
503    private static int getEvents(long eventAddress) {
504        return unsafe.getChar(eventAddress + OFFSETOF_EVENTS);
505    }
506
507    /**
508     * Returns event->revents
509     */
510    private static int getRevents(long eventAddress) {
511        return unsafe.getChar(eventAddress + OFFSETOF_REVENTS);
512    }
513
514    // -- Native methods --
515
516    private static native void init();
517
518    private static native int eventSize();
519
520    private static native int eventsOffset();
521
522    private static native int reventsOffset();
523
524    private static native int fdOffset();
525
526    private static native int pollsetCreate() throws IOException;
527
528    private static native int pollsetCtl(int pollset, int opcode, int fd, int events);
529
530    private static native int pollsetPoll(int pollset, long pollAddress, int numfds)
531        throws IOException;
532
533    private static native void pollsetDestroy(int pollset);
534
535    private static native void socketpair(int[] sv) throws IOException;
536
537    private static native void interrupt(int fd) throws IOException;
538
539    private static native void drain1(int fd) throws IOException;
540
541    private static native void close0(int fd);
542}
543