SelectorImpl.java revision 758:bb6bf34f121f
1/*
2 * Copyright (c) 2003, 2015, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation.  Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26package com.sun.corba.se.impl.transport;
27
28import java.io.IOException;
29import java.net.ServerSocket;
30import java.nio.channels.ClosedChannelException;
31import java.nio.channels.SelectableChannel;
32import java.nio.channels.ServerSocketChannel;
33import java.nio.channels.SelectionKey;
34import java.nio.channels.Selector;
35import java.nio.channels.ClosedSelectorException;
36import java.util.ArrayList;
37import java.util.HashMap;
38import java.util.Map;
39import java.util.Iterator;
40import java.util.List;
41
42
43import com.sun.corba.se.pept.broker.Broker;
44import com.sun.corba.se.pept.transport.Acceptor;
45import com.sun.corba.se.pept.transport.Connection;
46import com.sun.corba.se.pept.transport.EventHandler;
47import com.sun.corba.se.pept.transport.ListenerThread;
48import com.sun.corba.se.pept.transport.ReaderThread;
49
50import com.sun.corba.se.spi.logging.CORBALogDomains;
51import com.sun.corba.se.spi.orb.ORB;
52import com.sun.corba.se.spi.orbutil.threadpool.Work;
53import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;
54import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;
55
56import com.sun.corba.se.impl.logging.ORBUtilSystemException;
57import com.sun.corba.se.impl.orbutil.ORBUtility;
58
59/**
60 * @author Harold Carr
61 */
62class SelectorImpl
63    extends
64        Thread
65    implements
66        com.sun.corba.se.pept.transport.Selector
67{
68    private ORB orb;
69    private Selector selector;
70    private long timeout;
71    private List deferredRegistrations;
72    private List interestOpsList;
73    private HashMap listenerThreads;
74    private Map readerThreads;
75    private boolean selectorStarted;
76    private volatile boolean closed;
77    private ORBUtilSystemException wrapper;
78
79
80    public SelectorImpl(ORB orb)
81    {
82        super(null, null, "ORB-Selector-Thread", 0, false);
83        this.orb = orb;
84        selector = null;
85        selectorStarted = false;
86        timeout = 60000;
87        deferredRegistrations = new ArrayList();
88        interestOpsList = new ArrayList();
89        listenerThreads = new HashMap();
90        readerThreads = java.util.Collections.synchronizedMap(new HashMap());
91        closed = false;
92        wrapper = ORBUtilSystemException.get(orb,CORBALogDomains.RPC_TRANSPORT);
93    }
94
95    public void setTimeout(long timeout)
96    {
97        this.timeout = timeout;
98    }
99
100    public long getTimeout()
101    {
102        return timeout;
103    }
104
105    public void registerInterestOps(EventHandler eventHandler)
106    {
107        if (orb.transportDebugFlag) {
108            dprint(".registerInterestOps:-> " + eventHandler);
109        }
110
111        SelectionKey selectionKey = eventHandler.getSelectionKey();
112        if (selectionKey.isValid()) {
113            int ehOps = eventHandler.getInterestOps();
114            SelectionKeyAndOp keyAndOp = new SelectionKeyAndOp(selectionKey, ehOps);
115            synchronized(interestOpsList) {
116                interestOpsList.add(keyAndOp);
117            }
118            // tell Selector Thread there's an update to a SelectorKey's Ops
119            try {
120                if (selector != null) {
121                    // wakeup Selector thread to process close request
122                    selector.wakeup();
123                }
124            } catch (Throwable t) {
125                if (orb.transportDebugFlag) {
126                    dprint(".registerInterestOps: selector.wakeup: ", t);
127                }
128            }
129        }
130        else {
131            wrapper.selectionKeyInvalid(eventHandler.toString());
132            if (orb.transportDebugFlag) {
133                dprint(".registerInterestOps: EventHandler SelectionKey not valid " + eventHandler);
134            }
135        }
136
137        if (orb.transportDebugFlag) {
138            dprint(".registerInterestOps:<- ");
139        }
140    }
141
142    public void registerForEvent(EventHandler eventHandler)
143    {
144        if (orb.transportDebugFlag) {
145            dprint(".registerForEvent: " + eventHandler);
146        }
147
148        if (isClosed()) {
149            if (orb.transportDebugFlag) {
150                dprint(".registerForEvent: closed: " + eventHandler);
151            }
152            return;
153        }
154
155        if (eventHandler.shouldUseSelectThreadToWait()) {
156            synchronized (deferredRegistrations) {
157                deferredRegistrations.add(eventHandler);
158            }
159            if (! selectorStarted) {
160                startSelector();
161            }
162            selector.wakeup();
163            return;
164        }
165
166        switch (eventHandler.getInterestOps()) {
167        case SelectionKey.OP_ACCEPT :
168            createListenerThread(eventHandler);
169            break;
170        case SelectionKey.OP_READ :
171            createReaderThread(eventHandler);
172            break;
173        default:
174            if (orb.transportDebugFlag) {
175                dprint(".registerForEvent: default: " + eventHandler);
176            }
177            throw new RuntimeException(
178                "SelectorImpl.registerForEvent: unknown interest ops");
179        }
180    }
181
182    public void unregisterForEvent(EventHandler eventHandler)
183    {
184        if (orb.transportDebugFlag) {
185            dprint(".unregisterForEvent: " + eventHandler);
186        }
187
188        if (isClosed()) {
189            if (orb.transportDebugFlag) {
190                dprint(".unregisterForEvent: closed: " + eventHandler);
191            }
192            return;
193        }
194
195        if (eventHandler.shouldUseSelectThreadToWait()) {
196            SelectionKey selectionKey ;
197            synchronized(deferredRegistrations) {
198                selectionKey = eventHandler.getSelectionKey();
199            }
200            if (selectionKey != null) {
201                selectionKey.cancel();
202            }
203            if (selector != null) {
204                selector.wakeup();
205            }
206            return;
207        }
208
209        switch (eventHandler.getInterestOps()) {
210        case SelectionKey.OP_ACCEPT :
211            destroyListenerThread(eventHandler);
212            break;
213        case SelectionKey.OP_READ :
214            destroyReaderThread(eventHandler);
215            break;
216        default:
217            if (orb.transportDebugFlag) {
218                dprint(".unregisterForEvent: default: " + eventHandler);
219            }
220            throw new RuntimeException(
221                "SelectorImpl.uregisterForEvent: unknown interest ops");
222        }
223    }
224
225    public void close()
226    {
227        if (orb.transportDebugFlag) {
228            dprint(".close");
229        }
230
231        if (isClosed()) {
232            if (orb.transportDebugFlag) {
233                dprint(".close: already closed");
234            }
235            return;
236        }
237
238        setClosed(true);
239
240        Iterator i;
241
242        // Kill listeners.
243
244        i = listenerThreads.values().iterator();
245        while (i.hasNext()) {
246            ListenerThread listenerThread = (ListenerThread) i.next();
247            listenerThread.close();
248        }
249
250        // Kill readers.
251
252        i = readerThreads.values().iterator();
253        while (i.hasNext()) {
254            ReaderThread readerThread = (ReaderThread) i.next();
255            readerThread.close();
256        }
257
258       clearDeferredRegistrations();
259
260        // Selector
261
262        try {
263            if (selector != null) {
264                // wakeup Selector thread to process close request
265                selector.wakeup();
266            }
267        } catch (Throwable t) {
268            if (orb.transportDebugFlag) {
269                dprint(".close: selector.wakeup: ", t);
270            }
271        }
272    }
273
274    ///////////////////////////////////////////////////
275    //
276    // Thread methods.
277    //
278
279    public void run()
280    {
281        while (!closed) {
282            try {
283                int n = 0;
284                if (timeout == 0 && orb.transportDebugFlag) {
285                    dprint(".run: Beginning of selection cycle");
286                }
287                handleDeferredRegistrations();
288                enableInterestOps();
289                try {
290                    n = selector.select(timeout);
291                } catch (IOException  e) {
292                    if (orb.transportDebugFlag) {
293                        dprint(".run: selector.select: ", e);
294                    }
295                } catch (ClosedSelectorException csEx) {
296                    if (orb.transportDebugFlag) {
297                        dprint(".run: selector.select: ", csEx);
298                    }
299                    break;
300                }
301                if (closed) {
302                    break;
303                }
304                /*
305                  if (timeout == 0 && orb.transportDebugFlag) {
306                  dprint(".run: selector.select() returned: " + n);
307                  }
308                  if (n == 0) {
309                  continue;
310                  }
311                */
312                Iterator iterator = selector.selectedKeys().iterator();
313                if (orb.transportDebugFlag) {
314                    if (iterator.hasNext()) {
315                        dprint(".run: n = " + n);
316                    }
317                }
318                while (iterator.hasNext()) {
319                    SelectionKey selectionKey = (SelectionKey) iterator.next();
320                    iterator.remove();
321                    EventHandler eventHandler = (EventHandler)
322                        selectionKey.attachment();
323                    try {
324                        eventHandler.handleEvent();
325                    } catch (Throwable t) {
326                        if (orb.transportDebugFlag) {
327                            dprint(".run: eventHandler.handleEvent", t);
328                        }
329                    }
330                }
331                if (timeout == 0 && orb.transportDebugFlag) {
332                    dprint(".run: End of selection cycle");
333                }
334            } catch (Throwable t) {
335                // IMPORTANT: ignore all errors so the select thread keeps running.
336                // Otherwise a guaranteed hang.
337                if (orb.transportDebugFlag) {
338                    dprint(".run: ignoring", t);
339                }
340            }
341        }
342        try {
343            if (selector != null) {
344                if (orb.transportDebugFlag) {
345                    dprint(".run: selector.close ");
346                }
347                selector.close();
348            }
349        } catch (Throwable t) {
350            if (orb.transportDebugFlag) {
351                dprint(".run: selector.close: ", t);
352            }
353        }
354    }
355
356    /////////////////////////////////////////////////////
357    //
358    // Implementation.
359    //
360
361    private void clearDeferredRegistrations() {
362        synchronized (deferredRegistrations) {
363            int deferredListSize = deferredRegistrations.size();
364            if (orb.transportDebugFlag) {
365                dprint(".clearDeferredRegistrations:deferred list size == " + deferredListSize);
366            }
367            for (int i = 0; i < deferredListSize; i++) {
368                EventHandler eventHandler =
369                    (EventHandler)deferredRegistrations.get(i);
370                if (orb.transportDebugFlag) {
371                    dprint(".clearDeferredRegistrations: " + eventHandler);
372                }
373                SelectableChannel channel = eventHandler.getChannel();
374                SelectionKey selectionKey = null;
375
376                try {
377                    if (orb.transportDebugFlag) {
378                        dprint(".clearDeferredRegistrations:close channel == "
379                                + channel);
380                        dprint(".clearDeferredRegistrations:close channel class == "
381                                + channel.getClass().getName());
382                    }
383                    channel.close();
384                    selectionKey = eventHandler.getSelectionKey();
385                    if (selectionKey != null) {
386                        selectionKey.cancel();
387                        selectionKey.attach(null);
388                    }
389                } catch (IOException ioEx) {
390                    if (orb.transportDebugFlag) {
391                        dprint(".clearDeferredRegistrations: ", ioEx);
392                    }
393                }
394            }
395            deferredRegistrations.clear();
396        }
397    }
398
399    private synchronized boolean isClosed ()
400    {
401        return closed;
402    }
403
404    private synchronized void setClosed(boolean closed)
405    {
406        this.closed = closed;
407    }
408
409    private void startSelector()
410    {
411        try {
412            selector = Selector.open();
413        } catch (IOException e) {
414            if (orb.transportDebugFlag) {
415                dprint(".startSelector: Selector.open: IOException: ", e);
416            }
417            // REVISIT - better handling/reporting
418            RuntimeException rte =
419                new RuntimeException(".startSelector: Selector.open exception");
420            rte.initCause(e);
421            throw rte;
422        }
423        setDaemon(true);
424        start();
425        selectorStarted = true;
426        if (orb.transportDebugFlag) {
427            dprint(".startSelector: selector.start completed.");
428        }
429    }
430
431    private void handleDeferredRegistrations()
432    {
433        synchronized (deferredRegistrations) {
434            int deferredListSize = deferredRegistrations.size();
435            for (int i = 0; i < deferredListSize; i++) {
436                EventHandler eventHandler =
437                    (EventHandler)deferredRegistrations.get(i);
438                if (orb.transportDebugFlag) {
439                    dprint(".handleDeferredRegistrations: " + eventHandler);
440                }
441                SelectableChannel channel = eventHandler.getChannel();
442                SelectionKey selectionKey = null;
443                try {
444                    selectionKey =
445                        channel.register(selector,
446                                         eventHandler.getInterestOps(),
447                                         (Object)eventHandler);
448                } catch (ClosedChannelException e) {
449                    if (orb.transportDebugFlag) {
450                        dprint(".handleDeferredRegistrations: ", e);
451                    }
452                }
453                eventHandler.setSelectionKey(selectionKey);
454            }
455            deferredRegistrations.clear();
456        }
457    }
458
459    private void enableInterestOps()
460    {
461        synchronized (interestOpsList) {
462            int listSize = interestOpsList.size();
463            if (listSize > 0) {
464                if (orb.transportDebugFlag) {
465                    dprint(".enableInterestOps:->");
466                }
467                SelectionKey selectionKey = null;
468                SelectionKeyAndOp keyAndOp = null;
469                int keyOp, selectionKeyOps = 0;
470                for (int i = 0; i < listSize; i++) {
471                    keyAndOp = (SelectionKeyAndOp)interestOpsList.get(i);
472                    selectionKey = keyAndOp.selectionKey;
473
474                    // Need to check if the SelectionKey is valid because a
475                    // connection's SelectionKey could be put on the list to
476                    // have its OP enabled and before it's enabled be reclaimed.
477                    // Otherwise, the enabling of the OP will throw an exception
478                    // here and exit this method an potentially not enable all
479                    // registered ops.
480                    //
481                    // So, we ignore SelectionKeys that are invalid. They will get
482                    // cleaned up on the next Selector.select() call.
483
484                    if (selectionKey.isValid()) {
485                        if (orb.transportDebugFlag) {
486                            dprint(".enableInterestOps: " + keyAndOp);
487                        }
488                        keyOp = keyAndOp.keyOp;
489                        selectionKeyOps = selectionKey.interestOps();
490                        selectionKey.interestOps(selectionKeyOps | keyOp);
491                    }
492                }
493                interestOpsList.clear();
494                if (orb.transportDebugFlag) {
495                    dprint(".enableInterestOps:<-");
496                }
497            }
498        }
499    }
500
501    private void createListenerThread(EventHandler eventHandler)
502    {
503        if (orb.transportDebugFlag) {
504            dprint(".createListenerThread: " + eventHandler);
505        }
506        Acceptor acceptor = eventHandler.getAcceptor();
507        ListenerThread listenerThread =
508            new ListenerThreadImpl(orb, acceptor, this);
509        listenerThreads.put(eventHandler, listenerThread);
510        Throwable throwable = null;
511        try {
512            orb.getThreadPoolManager().getThreadPool(0)
513                .getWorkQueue(0).addWork((Work)listenerThread);
514        } catch (NoSuchThreadPoolException e) {
515            throwable = e;
516        } catch (NoSuchWorkQueueException e) {
517            throwable = e;
518        }
519        if (throwable != null) {
520            RuntimeException rte = new RuntimeException(throwable.toString());
521            rte.initCause(throwable);
522            throw rte;
523        }
524    }
525
526    private void destroyListenerThread(EventHandler eventHandler)
527    {
528        if (orb.transportDebugFlag) {
529            dprint(".destroyListenerThread: " + eventHandler);
530        }
531        ListenerThread listenerThread = (ListenerThread)
532            listenerThreads.get(eventHandler);
533        if (listenerThread == null) {
534            if (orb.transportDebugFlag) {
535                dprint(".destroyListenerThread: cannot find ListenerThread - ignoring.");
536            }
537            return;
538        }
539        listenerThreads.remove(eventHandler);
540        listenerThread.close();
541    }
542
543    private void createReaderThread(EventHandler eventHandler)
544    {
545        if (orb.transportDebugFlag) {
546            dprint(".createReaderThread: " + eventHandler);
547        }
548        Connection connection = eventHandler.getConnection();
549        ReaderThread readerThread =
550            new ReaderThreadImpl(orb, connection, this);
551        readerThreads.put(eventHandler, readerThread);
552        Throwable throwable = null;
553        try {
554            orb.getThreadPoolManager().getThreadPool(0)
555                .getWorkQueue(0).addWork((Work)readerThread);
556        } catch (NoSuchThreadPoolException e) {
557            throwable = e;
558        } catch (NoSuchWorkQueueException e) {
559            throwable = e;
560        }
561        if (throwable != null) {
562            RuntimeException rte = new RuntimeException(throwable.toString());
563            rte.initCause(throwable);
564            throw rte;
565        }
566    }
567
568    private void destroyReaderThread(EventHandler eventHandler)
569    {
570        if (orb.transportDebugFlag) {
571            dprint(".destroyReaderThread: " + eventHandler);
572        }
573        ReaderThread readerThread = (ReaderThread)
574            readerThreads.get(eventHandler);
575        if (readerThread == null) {
576            if (orb.transportDebugFlag) {
577                dprint(".destroyReaderThread: cannot find ReaderThread - ignoring.");
578            }
579            return;
580        }
581        readerThreads.remove(eventHandler);
582        readerThread.close();
583    }
584
585    private void dprint(String msg)
586    {
587        ORBUtility.dprint("SelectorImpl", msg);
588    }
589
590    protected void dprint(String msg, Throwable t)
591    {
592        dprint(msg);
593        t.printStackTrace(System.out);
594    }
595
596    // Private class to contain a SelectionKey and a SelectionKey op.
597    // Used only by SelectorImpl to register and enable SelectionKey
598    // Op.
599    // REVISIT - Could do away with this class and use the EventHanlder
600    //           directly.
601    private class SelectionKeyAndOp
602    {
603        // A SelectionKey.[OP_READ|OP_WRITE|OP_ACCEPT|OP_CONNECT]
604        public int keyOp;
605        public SelectionKey selectionKey;
606
607        // constructor
608        public SelectionKeyAndOp(SelectionKey selectionKey, int keyOp) {
609            this.selectionKey = selectionKey;
610            this.keyOp = keyOp;
611        }
612    }
613
614// End of file.
615}
616