1/*
2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package sun.nio.ch;
27
28import java.nio.channels.*;
29import java.nio.channels.spi.AsynchronousChannelProvider;
30import java.io.Closeable;
31import java.io.IOException;
32import java.io.FileDescriptor;
33import java.util.*;
34import java.util.concurrent.*;
35import java.util.concurrent.locks.ReadWriteLock;
36import java.util.concurrent.locks.ReentrantReadWriteLock;
37import jdk.internal.misc.Unsafe;
38
39/**
40 * Windows implementation of AsynchronousChannelGroup encapsulating an I/O
41 * completion port.
42 */
43
44class Iocp extends AsynchronousChannelGroupImpl {
45    private static final Unsafe unsafe = Unsafe.getUnsafe();
46    private static final long INVALID_HANDLE_VALUE  = -1L;
47
48    // maps completion key to channel
49    private final ReadWriteLock keyToChannelLock = new ReentrantReadWriteLock();
50    private final Map<Integer,OverlappedChannel> keyToChannel =
51        new HashMap<Integer,OverlappedChannel>();
52    private int nextCompletionKey;
53
54    // handle to completion port
55    private final long port;
56
57    // true if port has been closed
58    private boolean closed;
59
60    // the set of "stale" OVERLAPPED structures. These OVERLAPPED structures
61    // relate to I/O operations where the completion notification was not
62    // received in a timely manner after the channel is closed.
63    private final Set<Long> staleIoSet = new HashSet<Long>();
64
65    Iocp(AsynchronousChannelProvider provider, ThreadPool pool)
66        throws IOException
67    {
68        super(provider, pool);
69        this.port =
70          createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, fixedThreadCount());
71        this.nextCompletionKey = 1;
72    }
73
74    Iocp start() {
75        startThreads(new EventHandlerTask());
76        return this;
77    }
78
79    /*
80     * Channels implements this interface support overlapped I/O and can be
81     * associated with a completion port.
82     */
83    static interface OverlappedChannel extends Closeable {
84        /**
85         * Returns a reference to the pending I/O result.
86         */
87        <V,A> PendingFuture<V,A> getByOverlapped(long overlapped);
88    }
89
90    // release all resources
91    void implClose() {
92        synchronized (this) {
93            if (closed)
94                return;
95            closed = true;
96        }
97        close0(port);
98        synchronized (staleIoSet) {
99            for (Long ov: staleIoSet) {
100                unsafe.freeMemory(ov);
101            }
102            staleIoSet.clear();
103        }
104    }
105
106    @Override
107    boolean isEmpty() {
108        keyToChannelLock.writeLock().lock();
109        try {
110            return keyToChannel.isEmpty();
111        } finally {
112            keyToChannelLock.writeLock().unlock();
113        }
114    }
115
116    @Override
117    final Object attachForeignChannel(final Channel channel, FileDescriptor fdObj)
118        throws IOException
119    {
120        int key = associate(new OverlappedChannel() {
121            public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
122                return null;
123            }
124            public void close() throws IOException {
125                channel.close();
126            }
127        }, 0L);
128        return Integer.valueOf(key);
129    }
130
131    @Override
132    final void detachForeignChannel(Object key) {
133        disassociate((Integer)key);
134    }
135
136    @Override
137    void closeAllChannels() {
138        /**
139         * On Windows the close operation will close the socket/file handle
140         * and then wait until all outstanding I/O operations have aborted.
141         * This is necessary as each channel's cache of OVERLAPPED structures
142         * can only be freed once all I/O operations have completed. As I/O
143         * completion requires a lookup of the keyToChannel then we must close
144         * the channels when not holding the write lock.
145         */
146        final int MAX_BATCH_SIZE = 32;
147        OverlappedChannel channels[] = new OverlappedChannel[MAX_BATCH_SIZE];
148        int count;
149        do {
150            // grab a batch of up to 32 channels
151            keyToChannelLock.writeLock().lock();
152            count = 0;
153            try {
154                for (Integer key: keyToChannel.keySet()) {
155                    channels[count++] = keyToChannel.get(key);
156                    if (count >= MAX_BATCH_SIZE)
157                        break;
158                }
159            } finally {
160                keyToChannelLock.writeLock().unlock();
161            }
162
163            // close them
164            for (int i=0; i<count; i++) {
165                try {
166                    channels[i].close();
167                } catch (IOException ignore) { }
168            }
169        } while (count > 0);
170    }
171
172    private void wakeup() {
173        try {
174            postQueuedCompletionStatus(port, 0);
175        } catch (IOException e) {
176            // should not happen
177            throw new AssertionError(e);
178        }
179    }
180
181    @Override
182    void executeOnHandlerTask(Runnable task) {
183        synchronized (this) {
184            if (closed)
185                throw new RejectedExecutionException();
186            offerTask(task);
187            wakeup();
188        }
189
190    }
191
192    @Override
193    void shutdownHandlerTasks() {
194        // shutdown all handler threads
195        int nThreads = threadCount();
196        while (nThreads-- > 0) {
197            wakeup();
198        }
199    }
200
201    /**
202     * Associate the given handle with this group
203     */
204    int associate(OverlappedChannel ch, long handle) throws IOException {
205        keyToChannelLock.writeLock().lock();
206
207        // generate a completion key (if not shutdown)
208        int key;
209        try {
210            if (isShutdown())
211                throw new ShutdownChannelGroupException();
212
213            // generate unique key
214            do {
215                key = nextCompletionKey++;
216            } while ((key == 0) || keyToChannel.containsKey(key));
217
218            // associate with I/O completion port
219            if (handle != 0L) {
220                createIoCompletionPort(handle, port, key, 0);
221            }
222
223            // setup mapping
224            keyToChannel.put(key, ch);
225        } finally {
226            keyToChannelLock.writeLock().unlock();
227        }
228        return key;
229    }
230
231    /**
232     * Disassociate channel from the group.
233     */
234    void disassociate(int key) {
235        boolean checkForShutdown = false;
236
237        keyToChannelLock.writeLock().lock();
238        try {
239            keyToChannel.remove(key);
240
241            // last key to be removed so check if group is shutdown
242            if (keyToChannel.isEmpty())
243                checkForShutdown = true;
244
245        } finally {
246            keyToChannelLock.writeLock().unlock();
247        }
248
249        // continue shutdown
250        if (checkForShutdown && isShutdown()) {
251            try {
252                shutdownNow();
253            } catch (IOException ignore) { }
254        }
255    }
256
257    /**
258     * Invoked when a channel associated with this port is closed before
259     * notifications for all outstanding I/O operations have been received.
260     */
261    void makeStale(Long overlapped) {
262        synchronized (staleIoSet) {
263            staleIoSet.add(overlapped);
264        }
265    }
266
267    /**
268     * Checks if the given OVERLAPPED is stale and if so, releases it.
269     */
270    private void checkIfStale(long ov) {
271        synchronized (staleIoSet) {
272            boolean removed = staleIoSet.remove(ov);
273            if (removed) {
274                unsafe.freeMemory(ov);
275            }
276        }
277    }
278
279    /**
280     * The handler for consuming the result of an asynchronous I/O operation.
281     */
282    static interface ResultHandler {
283        /**
284         * Invoked if the I/O operation completes successfully.
285         */
286        public void completed(int bytesTransferred, boolean canInvokeDirect);
287
288        /**
289         * Invoked if the I/O operation fails.
290         */
291        public void failed(int error, IOException ioe);
292    }
293
294    // Creates IOException for the given I/O error.
295    private static IOException translateErrorToIOException(int error) {
296        String msg = getErrorMessage(error);
297        if (msg == null)
298            msg = "Unknown error: 0x0" + Integer.toHexString(error);
299        return new IOException(msg);
300    }
301
302    /**
303     * Long-running task servicing system-wide or per-file completion port
304     */
305    private class EventHandlerTask implements Runnable {
306        public void run() {
307            Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
308                Invoker.getGroupAndInvokeCount();
309            boolean canInvokeDirect = (myGroupAndInvokeCount != null);
310            CompletionStatus ioResult = new CompletionStatus();
311            boolean replaceMe = false;
312
313            try {
314                for (;;) {
315                    // reset invoke count
316                    if (myGroupAndInvokeCount != null)
317                        myGroupAndInvokeCount.resetInvokeCount();
318
319                    // wait for I/O completion event
320                    // A error here is fatal (thread will not be replaced)
321                    replaceMe = false;
322                    try {
323                        getQueuedCompletionStatus(port, ioResult);
324                    } catch (IOException x) {
325                        // should not happen
326                        x.printStackTrace();
327                        return;
328                    }
329
330                    // handle wakeup to execute task or shutdown
331                    if (ioResult.completionKey() == 0 &&
332                        ioResult.overlapped() == 0L)
333                    {
334                        Runnable task = pollTask();
335                        if (task == null) {
336                            // shutdown request
337                            return;
338                        }
339
340                        // run task
341                        // (if error/exception then replace thread)
342                        replaceMe = true;
343                        task.run();
344                        continue;
345                    }
346
347                    // map key to channel
348                    OverlappedChannel ch = null;
349                    keyToChannelLock.readLock().lock();
350                    try {
351                        ch = keyToChannel.get(ioResult.completionKey());
352                        if (ch == null) {
353                            checkIfStale(ioResult.overlapped());
354                            continue;
355                        }
356                    } finally {
357                        keyToChannelLock.readLock().unlock();
358                    }
359
360                    // lookup I/O request
361                    PendingFuture<?,?> result = ch.getByOverlapped(ioResult.overlapped());
362                    if (result == null) {
363                        // we get here if the OVERLAPPED structure is associated
364                        // with an I/O operation on a channel that was closed
365                        // but the I/O operation event wasn't read in a timely
366                        // manner. Alternatively, it may be related to a
367                        // tryLock operation as the OVERLAPPED structures for
368                        // these operations are not in the I/O cache.
369                        checkIfStale(ioResult.overlapped());
370                        continue;
371                    }
372
373                    // synchronize on result in case I/O completed immediately
374                    // and was handled by initiator
375                    synchronized (result) {
376                        if (result.isDone()) {
377                            continue;
378                        }
379                        // not handled by initiator
380                    }
381
382                    // invoke I/O result handler
383                    int error = ioResult.error();
384                    ResultHandler rh = (ResultHandler)result.getContext();
385                    replaceMe = true; // (if error/exception then replace thread)
386                    if (error == 0) {
387                        rh.completed(ioResult.bytesTransferred(), canInvokeDirect);
388                    } else {
389                        rh.failed(error, translateErrorToIOException(error));
390                    }
391                }
392            } finally {
393                // last thread to exit when shutdown releases resources
394                int remaining = threadExit(this, replaceMe);
395                if (remaining == 0 && isShutdown()) {
396                    implClose();
397                }
398            }
399        }
400    }
401
402    /**
403     * Container for data returned by GetQueuedCompletionStatus
404     */
405    private static class CompletionStatus {
406        private int error;
407        private int bytesTransferred;
408        private int completionKey;
409        private long overlapped;
410
411        private CompletionStatus() { }
412        int error() { return error; }
413        int bytesTransferred() { return bytesTransferred; }
414        int completionKey() { return completionKey; }
415        long overlapped() { return overlapped; }
416    }
417
418    // -- native methods --
419
420    private static native void initIDs();
421
422    private static native long createIoCompletionPort(long handle,
423        long existingPort, int completionKey, int concurrency) throws IOException;
424
425    private static native void close0(long handle);
426
427    private static native void getQueuedCompletionStatus(long completionPort,
428        CompletionStatus status) throws IOException;
429
430    private static native void postQueuedCompletionStatus(long completionPort,
431        int completionKey) throws IOException;
432
433    private static native String getErrorMessage(int error);
434
435    static {
436        IOUtil.load();
437        initIDs();
438    }
439}
440