SelectorImpl.java revision 608:7e06bf1dcb09
1/*
2 * Copyright (c) 2003, 2013, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package com.sun.corba.se.impl.transport;
27
28import java.io.IOException;
29import java.nio.channels.ClosedChannelException;
30import java.nio.channels.SelectableChannel;
31import java.nio.channels.SelectionKey;
32import java.nio.channels.Selector;
33import java.util.ArrayList;
34import java.util.HashMap;
35import java.util.Map;
36import java.util.Iterator;
37import java.util.List;
38
39import com.sun.corba.se.pept.broker.Broker;
40import com.sun.corba.se.pept.transport.Acceptor;
41import com.sun.corba.se.pept.transport.Connection;
42import com.sun.corba.se.pept.transport.EventHandler;
43import com.sun.corba.se.pept.transport.ListenerThread;
44import com.sun.corba.se.pept.transport.ReaderThread;
45
46import com.sun.corba.se.spi.logging.CORBALogDomains;
47import com.sun.corba.se.spi.orb.ORB;
48import com.sun.corba.se.spi.orbutil.threadpool.Work;
49import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
50import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
51
52import com.sun.corba.se.impl.logging.ORBUtilSystemException;
53import com.sun.corba.se.impl.orbutil.ORBUtility;
54
55/**
56 * @author Harold Carr
57 */
58class SelectorImpl
59    extends
60        Thread
61    implements
62        com.sun.corba.se.pept.transport.Selector
63{
64    private ORB orb;
65    private Selector selector;
66    private long timeout;
67    private List deferredRegistrations;
68    private List interestOpsList;
69    private HashMap listenerThreads;
70    private Map readerThreads;
71    private boolean selectorStarted;
72    private volatile boolean closed;
73    private ORBUtilSystemException wrapper;
74
75
76    public SelectorImpl(ORB orb)
77    {
78        this.orb = orb;
79        selector = null;
80        selectorStarted = false;
81        timeout = 60000;
82        deferredRegistrations = new ArrayList();
83        interestOpsList = new ArrayList();
84        listenerThreads = new HashMap();
85        readerThreads = java.util.Collections.synchronizedMap(new HashMap());
86        closed = false;
87        wrapper = ORBUtilSystemException.get(orb,CORBALogDomains.RPC_TRANSPORT);
88    }
89
90    public void setTimeout(long timeout)
91    {
92        this.timeout = timeout;
93    }
94
95    public long getTimeout()
96    {
97        return timeout;
98    }
99
100    public void registerInterestOps(EventHandler eventHandler)
101    {
102        if (orb.transportDebugFlag) {
103            dprint(".registerInterestOps:-> " + eventHandler);
104        }
105
106        SelectionKey selectionKey = eventHandler.getSelectionKey();
107        if (selectionKey.isValid()) {
108            int ehOps = eventHandler.getInterestOps();
109            SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps);
110            synchronized(interestOpsList) {
111                interestOpsList.add(keyAndOp);
112            }
113            // tell Selector Thread there's an update to a SelectorKey's Ops
114            selector.wakeup();
115        }
116        else {
117            wrapper.selectionKeyInvalid(eventHandler.toString());
118            if (orb.transportDebugFlag) {
119                dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler);
120            }
121        }
122
123        if (orb.transportDebugFlag) {
124            dprint(".registerInterestOps:<- ");
125        }
126    }
127
128    public void registerForEvent(EventHandler eventHandler)
129    {
130        if (orb.transportDebugFlag) {
131            dprint(".registerForEvent: " + eventHandler);
132        }
133
134        if (isClosed()) {
135            if (orb.transportDebugFlag) {
136                dprint(".registerForEvent: closed: " + eventHandler);
137            }
138            return;
139        }
140
141        if (eventHandler.shouldUseSelectThreadToWait()) {
142            synchronized (deferredRegistrations) {
143                deferredRegistrations.add(eventHandler);
144            }
145            if (! selectorStarted) {
146                startSelector();
147            }
148            selector.wakeup();
149            return;
150        }
151
152        switch (eventHandler.getInterestOps()) {
153        case SelectionKey.OP_ACCEPT :
154            createListenerThread(eventHandler);
155            break;
156        case SelectionKey.OP_READ :
157            createReaderThread(eventHandler);
158            break;
159        default:
160            if (orb.transportDebugFlag) {
161                dprint(".registerForEvent: default: " + eventHandler);
162            }
163            throw new RuntimeException(
164                "SelectorImpl.registerForEvent: unknown interest ops");
165        }
166    }
167
168    public void unregisterForEvent(EventHandler eventHandler)
169    {
170        if (orb.transportDebugFlag) {
171            dprint(".unregisterForEvent: " + eventHandler);
172        }
173
174        if (isClosed()) {
175            if (orb.transportDebugFlag) {
176                dprint(".unregisterForEvent: closed: " + eventHandler);
177            }
178            return;
179        }
180
181        if (eventHandler.shouldUseSelectThreadToWait()) {
182            SelectionKey selectionKey ;
183            synchronized(deferredRegistrations) {
184                selectionKey = eventHandler.getSelectionKey();
185            }
186            if (selectionKey != null) {
187                selectionKey.cancel();
188            }
189            selector.wakeup();
190            return;
191        }
192
193        switch (eventHandler.getInterestOps()) {
194        case SelectionKey.OP_ACCEPT :
195            destroyListenerThread(eventHandler);
196            break;
197        case SelectionKey.OP_READ :
198            destroyReaderThread(eventHandler);
199            break;
200        default:
201            if (orb.transportDebugFlag) {
202                dprint(".unregisterForEvent: default: " + eventHandler);
203            }
204            throw new RuntimeException(
205                "SelectorImpl.uregisterForEvent: unknown interest ops");
206        }
207    }
208
209    public void close()
210    {
211        if (orb.transportDebugFlag) {
212            dprint(".close");
213        }
214
215        if (isClosed()) {
216            if (orb.transportDebugFlag) {
217                dprint(".close: already closed");
218            }
219            return;
220        }
221
222        setClosed(true);
223
224        Iterator i;
225
226        // Kill listeners.
227
228        i = listenerThreads.values().iterator();
229        while (i.hasNext()) {
230            ListenerThread listenerThread = (ListenerThread) i.next();
231            listenerThread.close();
232        }
233
234        // Kill readers.
235
236        i = readerThreads.values().iterator();
237        while (i.hasNext()) {
238            ReaderThread readerThread = (ReaderThread) i.next();
239            readerThread.close();
240        }
241
242        // Selector
243
244        try {
245            if (selector != null) {
246                // wakeup Selector thread to process close request
247                selector.wakeup();
248            }
249        } catch (Throwable t) {
250            if (orb.transportDebugFlag) {
251                dprint(".close: selector.close: " + t);
252            }
253        }
254    }
255
256    ///////////////////////////////////////////////////
257    //
258    // Thread methods.
259    //
260
261    public void run()
262    {
263        setName("SelectorThread");
264        while (!closed) {
265            try {
266                int n = 0;
267                if (timeout == 0 && orb.transportDebugFlag) {
268                    dprint(".run: Beginning of selection cycle");
269                }
270                handleDeferredRegistrations();
271                enableInterestOps();
272                try {
273                    n = selector.select(timeout);
274                } catch (IOException  e) {
275                    if (orb.transportDebugFlag) {
276                        dprint(".run: selector.select: " + e);
277                    }
278                }
279                if (closed) {
280                    selector.close();
281                    if (orb.transportDebugFlag) {
282                        dprint(".run: closed - .run return");
283                    }
284                    return;
285                }
286                /*
287                  if (timeout == 0 && orb.transportDebugFlag) {
288                  dprint(".run: selector.select() returned: " + n);
289                  }
290                  if (n == 0) {
291                  continue;
292                  }
293                */
294                Iterator iterator = selector.selectedKeys().iterator();
295                if (orb.transportDebugFlag) {
296                    if (iterator.hasNext()) {
297                        dprint(".run: n = " + n);
298                    }
299                }
300                while (iterator.hasNext()) {
301                    SelectionKey selectionKey = (SelectionKey) iterator.next();
302                    iterator.remove();
303                    EventHandler eventHandler = (EventHandler)
304                        selectionKey.attachment();
305                    try {
306                        eventHandler.handleEvent();
307                    } catch (Throwable t) {
308                        if (orb.transportDebugFlag) {
309                            dprint(".run: eventHandler.handleEvent", t);
310                        }
311                    }
312                }
313                if (timeout == 0 && orb.transportDebugFlag) {
314                    dprint(".run: End of selection cycle");
315                }
316            } catch (Throwable t) {
317                // IMPORTANT: ignore all errors so the select thread keeps running.
318                // Otherwise a guaranteed hang.
319                if (orb.transportDebugFlag) {
320                    dprint(".run: ignoring", t);
321                }
322            }
323        }
324    }
325
326    /////////////////////////////////////////////////////
327    //
328    // Implementation.
329    //
330
331    private synchronized boolean isClosed ()
332    {
333        return closed;
334    }
335
336    private synchronized void setClosed(boolean closed)
337    {
338        this.closed = closed;
339    }
340
341    private void startSelector()
342    {
343        try {
344            selector = Selector.open();
345        } catch (IOException e) {
346            if (orb.transportDebugFlag) {
347                dprint(".startSelector: Selector.open: IOException: " + e);
348            }
349            // REVISIT - better handling/reporting
350            RuntimeException rte =
351                new RuntimeException(".startSelector: Selector.open exception");
352            rte.initCause(e);
353            throw rte;
354        }
355        setDaemon(true);
356        start();
357        selectorStarted = true;
358        if (orb.transportDebugFlag) {
359            dprint(".startSelector: selector.start completed.");
360        }
361    }
362
363    private void handleDeferredRegistrations()
364    {
365        synchronized (deferredRegistrations) {
366            int deferredListSize = deferredRegistrations.size();
367            for (int i = 0; i < deferredListSize; i++) {
368                EventHandler eventHandler =
369                    (EventHandler)deferredRegistrations.get(i);
370                if (orb.transportDebugFlag) {
371                    dprint(".handleDeferredRegistrations: " + eventHandler);
372                }
373                SelectableChannel channel = eventHandler.getChannel();
374                SelectionKey selectionKey = null;
375                try {
376                    selectionKey =
377                        channel.register(selector,
378                                         eventHandler.getInterestOps(),
379                                         (Object)eventHandler);
380                } catch (ClosedChannelException e) {
381                    if (orb.transportDebugFlag) {
382                        dprint(".handleDeferredRegistrations: " + e);
383                    }
384                }
385                eventHandler.setSelectionKey(selectionKey);
386            }
387            deferredRegistrations.clear();
388        }
389    }
390
391    private void enableInterestOps()
392    {
393        synchronized (interestOpsList) {
394            int listSize = interestOpsList.size();
395            if (listSize > 0) {
396                if (orb.transportDebugFlag) {
397                    dprint(".enableInterestOps:->");
398                }
399                SelectionKey selectionKey = null;
400                SelectionKeyAndOp keyAndOp = null;
401                int keyOp, selectionKeyOps = 0;
402                for (int i = 0; i < listSize; i++) {
403                    keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i);
404                    selectionKey = keyAndOp.selectionKey;
405
406                    // Need to check if the SelectionKey is valid because a
407                    // connection's SelectionKey could be put on the list to
408                    // have its OP enabled and before it's enabled be reclaimed.
409                    // Otherwise, the enabling of the OP will throw an exception
410                    // here and exit this method an potentially not enable all
411                    // registered ops.
412                    //
413                    // So, we ignore SelectionKeys that are invalid. They will get
414                    // cleaned up on the next Selector.select() call.
415
416                    if (selectionKey.isValid()) {
417                        if (orb.transportDebugFlag) {
418                            dprint(".enableInterestOps: " + keyAndOp);
419                        }
420                        keyOp = keyAndOp.keyOp;
421                        selectionKeyOps = selectionKey.interestOps();
422                        selectionKey.interestOps(selectionKeyOps | keyOp);
423                    }
424                }
425                interestOpsList.clear();
426                if (orb.transportDebugFlag) {
427                    dprint(".enableInterestOps:<-");
428                }
429            }
430        }
431    }
432
433    private void createListenerThread(EventHandler eventHandler)
434    {
435        if (orb.transportDebugFlag) {
436            dprint(".createListenerThread: " + eventHandler);
437        }
438        Acceptor acceptor = eventHandler.getAcceptor();
439        ListenerThread listenerThread =
440            new ListenerThreadImpl(orb, acceptor, this);
441        listenerThreads.put(eventHandler, listenerThread);
442        Throwable throwable = null;
443        try {
444            orb.getThreadPoolManager().getThreadPool(0)
445                .getWorkQueue(0).addWork((Work)listenerThread);
446        } catch (NoSuchThreadPoolException e) {
447            throwable = e;
448        } catch (NoSuchWorkQueueException e) {
449            throwable = e;
450        }
451        if (throwable != null) {
452            RuntimeException rte = new RuntimeException(throwable.toString());
453            rte.initCause(throwable);
454            throw rte;
455        }
456    }
457
458    private void destroyListenerThread(EventHandler eventHandler)
459    {
460        if (orb.transportDebugFlag) {
461            dprint(".destroyListenerThread: " + eventHandler);
462        }
463        ListenerThread listenerThread = (ListenerThread)
464            listenerThreads.get(eventHandler);
465        if (listenerThread == null) {
466            if (orb.transportDebugFlag) {
467                dprint(".destroyListenerThread: cannot find ListenerThread - ignoring.");
468            }
469            return;
470        }
471        listenerThreads.remove(eventHandler);
472        listenerThread.close();
473    }
474
475    private void createReaderThread(EventHandler eventHandler)
476    {
477        if (orb.transportDebugFlag) {
478            dprint(".createReaderThread: " + eventHandler);
479        }
480        Connection connection = eventHandler.getConnection();
481        ReaderThread readerThread =
482            new ReaderThreadImpl(orb, connection, this);
483        readerThreads.put(eventHandler, readerThread);
484        Throwable throwable = null;
485        try {
486            orb.getThreadPoolManager().getThreadPool(0)
487                .getWorkQueue(0).addWork((Work)readerThread);
488        } catch (NoSuchThreadPoolException e) {
489            throwable = e;
490        } catch (NoSuchWorkQueueException e) {
491            throwable = e;
492        }
493        if (throwable != null) {
494            RuntimeException rte = new RuntimeException(throwable.toString());
495            rte.initCause(throwable);
496            throw rte;
497        }
498    }
499
500    private void destroyReaderThread(EventHandler eventHandler)
501    {
502        if (orb.transportDebugFlag) {
503            dprint(".destroyReaderThread: " + eventHandler);
504        }
505        ReaderThread readerThread = (ReaderThread)
506            readerThreads.get(eventHandler);
507        if (readerThread == null) {
508            if (orb.transportDebugFlag) {
509                dprint(".destroyReaderThread: cannot find ReaderThread - ignoring.");
510            }
511            return;
512        }
513        readerThreads.remove(eventHandler);
514        readerThread.close();
515    }
516
517    private void dprint(String msg)
518    {
519        ORBUtility.dprint("SelectorImpl", msg);
520    }
521
522    protected void dprint(String msg, Throwable t)
523    {
524        dprint(msg);
525        t.printStackTrace(System.out);
526    }
527
528    // Private class to contain a SelectionKey and a SelectionKey op.
529    // Used only by SelectorImpl to register and enable SelectionKey
530    // Op.
531    // REVISIT - Could do away with this class and use the EventHanlder
532    //           directly.
533    private class SelectionKeyAndOp
534    {
535        // A SelectionKey.[OP_READ|OP_WRITE|OP_ACCEPT|OP_CONNECT]
536        public int keyOp;
537        public SelectionKey selectionKey;
538
539        // constructor
540        public SelectionKeyAndOp(SelectionKey selectionKey, int keyOp) {
541            this.selectionKey = selectionKey;
542            this.keyOp = keyOp;
543        }
544    }
545
546// End of file.
547}
548