1/*
2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package sun.nio.ch;
27
28import java.nio.channels.spi.AsynchronousChannelProvider;
29import java.util.concurrent.RejectedExecutionException;
30import java.io.IOException;
31import jdk.internal.misc.Unsafe;
32
33/**
34 * Provides an AsynchronousChannelGroup implementation based on the Solaris 10
35 * event port framework and also provides direct access to that framework.
36 */
37
38class SolarisEventPort
39    extends Port
40{
41    private static final Unsafe unsafe = Unsafe.getUnsafe();
42    private static final int addressSize = unsafe.addressSize();
43
44    private static int dependsArch(int value32, int value64) {
45        return (addressSize == 4) ? value32 : value64;
46    }
47
48    /*
49     * typedef struct port_event {
50     *     int             portev_events;
51     *     ushort_t        portev_source;
52     *     ushort_t        portev_pad;
53     *     uintptr_t       portev_object;
54     *     void            *portev_user;
55     * } port_event_t;
56     */
57    static final int SIZEOF_PORT_EVENT  = dependsArch(16, 24);
58    static final int OFFSETOF_EVENTS    = 0;
59    static final int OFFSETOF_SOURCE    = 4;
60    static final int OFFSETOF_OBJECT    = 8;
61
62    // port sources
63    static final short PORT_SOURCE_USER     = 3;
64    static final short PORT_SOURCE_FD       = 4;
65
66    // file descriptor to event port.
67    private final int port;
68
69    // true when port is closed
70    private boolean closed;
71
72    SolarisEventPort(AsynchronousChannelProvider provider, ThreadPool pool)
73        throws IOException
74    {
75        super(provider, pool);
76
77        // create event port
78        this.port = port_create();
79    }
80
81    SolarisEventPort start() {
82        startThreads(new EventHandlerTask());
83        return this;
84    }
85
86    // releass resources
87    private void implClose() {
88        synchronized (this) {
89            if (closed)
90                return;
91            closed = true;
92        }
93        port_close(port);
94    }
95
96    private void wakeup() {
97        try {
98            port_send(port, 0);
99        } catch (IOException x) {
100            throw new AssertionError(x);
101        }
102    }
103
104    @Override
105    void executeOnHandlerTask(Runnable task) {
106        synchronized (this) {
107            if (closed)
108                throw new RejectedExecutionException();
109            offerTask(task);
110            wakeup();
111        }
112    }
113
114    @Override
115    void shutdownHandlerTasks() {
116       /*
117         * If no tasks are running then just release resources; otherwise
118         * write to the one end of the socketpair to wakeup any polling threads..
119         */
120        int nThreads = threadCount();
121        if (nThreads == 0) {
122            implClose();
123        } else {
124            // send user event to wakeup each thread
125            while (nThreads-- > 0) {
126                try {
127                    port_send(port, 0);
128                } catch (IOException x) {
129                    throw new AssertionError(x);
130                }
131            }
132        }
133    }
134
135    @Override
136    void startPoll(int fd, int events) {
137        // (re-)associate file descriptor
138        // no need to translate events
139        try {
140            port_associate(port, PORT_SOURCE_FD, fd, events);
141        } catch (IOException x) {
142            throw new AssertionError();     // should not happen
143        }
144    }
145
146    /*
147     * Task to read a single event from the port and dispatch it to the
148     * channel's onEvent handler.
149     */
150    private class EventHandlerTask implements Runnable {
151        public void run() {
152            Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
153                Invoker.getGroupAndInvokeCount();
154            final boolean isPooledThread = (myGroupAndInvokeCount != null);
155            boolean replaceMe = false;
156            long address = unsafe.allocateMemory(SIZEOF_PORT_EVENT);
157            try {
158                for (;;) {
159                    // reset invoke count
160                    if (isPooledThread)
161                        myGroupAndInvokeCount.resetInvokeCount();
162
163                    // wait for I/O completion event
164                    // A error here is fatal (thread will not be replaced)
165                    replaceMe = false;
166                    try {
167                        port_get(port, address);
168                    } catch (IOException x) {
169                        x.printStackTrace();
170                        return;
171                    }
172
173                    // event source
174                    short source = unsafe.getShort(address + OFFSETOF_SOURCE);
175                    if (source != PORT_SOURCE_FD) {
176                        // user event is trigger to invoke task or shutdown
177                        if (source == PORT_SOURCE_USER) {
178                            Runnable task = pollTask();
179                            if (task == null) {
180                                // shutdown request
181                                return;
182                            }
183                            // run task (may throw error/exception)
184                            replaceMe = true;
185                            task.run();
186                        }
187                        // ignore
188                        continue;
189                    }
190
191                    // pe->portev_object is file descriptor
192                    int fd = (int)unsafe.getAddress(address + OFFSETOF_OBJECT);
193                    // pe->portev_events
194                    int events = unsafe.getInt(address + OFFSETOF_EVENTS);
195
196                    // lookup channel
197                    PollableChannel ch;
198                    fdToChannelLock.readLock().lock();
199                    try {
200                        ch = fdToChannel.get(fd);
201                    } finally {
202                        fdToChannelLock.readLock().unlock();
203                    }
204
205                    // notify channel
206                    if (ch != null) {
207                        replaceMe = true;
208                        // no need to translate events
209                        ch.onEvent(events, isPooledThread);
210                    }
211                }
212            } finally {
213                // free per-thread resources
214                unsafe.freeMemory(address);
215                // last task to exit when shutdown release resources
216                int remaining = threadExit(this, replaceMe);
217                if (remaining == 0 && isShutdown())
218                    implClose();
219            }
220        }
221    }
222
223    /**
224     * Creates an event port
225     */
226    static native int port_create() throws IOException;
227
228    /**
229     * Associates specific events of a given object with a port
230     */
231    static native boolean port_associate(int port, int source, long object, int events)
232        throws IOException;
233
234    /**
235     * Removes the association of an object with a port.
236     */
237    static native boolean port_dissociate(int port, int source, long object)
238        throws IOException;
239
240    /**
241     * Retrieves a single event from a port
242     */
243    static native void port_get(int port, long pe) throws IOException;
244
245    /**
246     * Retrieves at most {@code max} events from a port.
247     */
248    static native int port_getn(int port, long address, int max, long timeout)
249        throws IOException;
250
251    /**
252     * Sends a user-defined eventto a specified  port.
253     */
254    static native void port_send(int port, int events) throws IOException;
255
256    /**
257     * Closes a port.
258     */
259    static native void port_close(int port);
260
261
262    static {
263        IOUtil.load();
264    }
265}
266