1/*
2 * Copyright (c) 2010 Apple Inc. All rights reserved.
3 *
4 * @APPLE_LICENSE_HEADER_START@
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * 1.  Redistributions of source code must retain the above copyright
11 *     notice, this list of conditions and the following disclaimer.
12 * 2.  Redistributions in binary form must reproduce the above copyright
13 *     notice, this list of conditions and the following disclaimer in the
14 *     documentation and/or other materials provided with the distribution.
15 * 3.  Neither the name of Apple Inc. ("Apple") nor the names of its
16 *     contributors may be used to endorse or promote products derived from
17 *     this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY APPLE AND ITS CONTRIBUTORS "AS IS" AND ANY
20 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 * DISCLAIMED. IN NO EVENT SHALL APPLE OR ITS CONTRIBUTORS BE LIABLE FOR ANY
23 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 *
30 * Portions of this software have been released under the following terms:
31 *
32 * (c) Copyright 1989-1993 OPEN SOFTWARE FOUNDATION, INC.
33 * (c) Copyright 1989-1993 HEWLETT-PACKARD COMPANY
34 * (c) Copyright 1989-1993 DIGITAL EQUIPMENT CORPORATION
35 *
36 * To anyone who acknowledges that this file is provided "AS IS"
37 * without any express or implied warranty:
38 * permission to use, copy, modify, and distribute this file for any
39 * purpose is hereby granted without fee, provided that the above
40 * copyright notices and this notice appears in all source code copies,
41 * and that none of the names of Open Software Foundation, Inc., Hewlett-
42 * Packard Company or Digital Equipment Corporation be used
43 * in advertising or publicity pertaining to distribution of the software
44 * without specific, written prior permission.  Neither Open Software
45 * Foundation, Inc., Hewlett-Packard Company nor Digital
46 * Equipment Corporation makes any representations about the suitability
47 * of this software for any purpose.
48 *
49 * Copyright (c) 2007, Novell, Inc. All rights reserved.
50 * Redistribution and use in source and binary forms, with or without
51 * modification, are permitted provided that the following conditions
52 * are met:
53 *
54 * 1.  Redistributions of source code must retain the above copyright
55 *     notice, this list of conditions and the following disclaimer.
56 * 2.  Redistributions in binary form must reproduce the above copyright
57 *     notice, this list of conditions and the following disclaimer in the
58 *     documentation and/or other materials provided with the distribution.
59 * 3.  Neither the name of Novell Inc. nor the names of its contributors
60 *     may be used to endorse or promote products derived from this
61 *     this software without specific prior written permission.
62 *
63 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
64 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
65 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
66 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY
67 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
68 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
69 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
70 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
71 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
72 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
73 *
74 * @APPLE_LICENSE_HEADER_END@
75 */
76
77/*
78**
79**  NAME
80**
81**      comcthd.c
82**
83**  FACILITY:
84**
85**      Remote Procedure Call (RPC)
86**
87**  ABSTRACT:
88**
89**  Definition of the Call Thread Services for the Common
90**  Communication Services component. These routines permit
91**  a call thread to be created and invoked.
92**
93**
94*/
95
96#include <commonp.h>    /* Common declarations for all RPC runtime  */
97#include <com.h>        /* Common communications services           */
98#include <comprot.h>    /* Common protocol services                 */
99#include <comnaf.h>     /* Common network address family services   */
100#include <comp.h>       /* Private communications services          */
101#include <comcthd.h>    /* Shared call thread services              */
102
103
104/*
105 * The multiplier to apply to a pool's n_threads to get the queue depth.
106 */
107#ifndef RPC_C_CTHREAD_QUEUE_MULTIPLIER
108#  define RPC_C_CTHREAD_QUEUE_MULTIPLIER    8
109#endif
110
111/*
112 * Call thread states
113 */
114#define RPC_C_NO_CTHREAD        0       /* no thread exists */
115#define RPC_C_IDLE_CTHREAD      1       /* thread exists but is idle */
116#define RPC_C_ACTIVE_CTHREAD    2       /* thread has call allocated to it */
117
118/*
119 * The call thread table element structure.
120 * See the "Call Thread Pool" description for more info...
121 *
122 * When a thread is idle, it is waiting on its private condition variable.
123 *
124 * The per element "pool" pointer simply solves the problem of needing
125 * the created thread to have two args (without having to alloc memory
126 * just for the create phase).  The space really isn't "wasted" since
127 * since each thread would otherwise have to keep this var on their stack.
128 */
129typedef struct
130{
131    unsigned8                           thread_state;
132    dcethread*                           thread_id;
133    rpc_cond_t                          thread_cond;
134    struct cthread_pool_elt_t           *pool;
135    rpc_call_rep_p_t                    call_rep;
136} cthread_elt_t, *cthread_elt_p_t;
137
138/*
139 * Reserved Pool Call Queue Element.
140 *
141 * These queue elements form a backing structure that allows
142 * us to maintain a call_rep on two pool lists.
143 */
144typedef struct {
145    rpc_list_t                          link;
146    struct cthread_pool_elt_t           *pool;
147    rpc_call_rep_p_t                    call_rep;
148} cthread_queue_elt_t, *cthread_queue_elt_p_t;
149
150/*
151 * Call Thread Pools.
152 *
153 * READ THIS IF YOU WANT TO UNDERSTAND THIS STUFF!
154 *
155 * This structure exists to accomplish the desired effect of allowing
156 * applications to have calls execute using threads from some application
157 * defined set of thread pools.  The application declared thread pools are
158 * refered to "reserved" pools (due to a previous incarnation of this
159 * code which provided -a too limited scheme of- thread reservation on
160 * per-interface basis).
161 *
162 * The application (via the pool lookup function callout) gets to decide
163 * which pool a call should be associated with.  This general mechanism
164 * provides the application with the basic hooks to implement any one
165 * of a number of schemes for allocating calls to threads.
166 *
167 * If the application declares that the call should use a reserved pool,
168 * a free thread from that pool will be allocated, otherwise, a default
169 * pool free thread will be allocated, otherwise, the call will be put
170 * on the default and reserved pool queue for execution by the first
171 * available default or reserved pool thread.  In any case, the calls
172 * for a reserved pool are assigned idle execution threads in the order
173 * in which they are received.
174 *
175 * The default pool is created by rpc_server_listen().  Non-reserved
176 * (default pool) threads can execute *any* call (the idle threads are
177 * assigned to calls in the order in which they are received).  The total
178 * number of call threads in a server is the sum of the threads in the
179 * default pool and the reserved pools.
180 *
181 * The relationship between n_queued, max_queued and the call_queue
182 * requires some explanation.  n_queued and max_queued represent the
183 * number and limit respectively of the number of call_queue entries
184 * FOR THIS POOL TYPE.  For *reserved pools*, all of these variables
185 * make sense in an intuitive way: n_queued always represents the true
186 * number of elements on the queue and the number of elements on the
187 * queue will not exceed max_queued.
188 *
189 * The default pool's use of these variables is less intuitive since
190 * *all* queued calls are on the default pool's queue.  In this case,
191 * the number of elements on the queue can *exceed* the default pool's
192 * n_queued and max_queued!  n_queued and max_queued (as stated above)
193 * strictly represent the number of default pool calls (i.e. those calls
194 * that are not associated with a reserved pool).  The end result is
195 * that this accomplishes the desired max queuing limitations - i.e.
196 * the maximums are imposed on a per pool basis;  use the queue to
197 * to determine of there are actually any queued calls to process,
198 * NOT n_queued.
199 *
200 * The default pool uses its pool.call_queue to directly link call reps
201 * and it does not use the pool.free_queue.  The reserved pools can't
202 * directly link the call reps (because call reps only have one rpc_list
203 * "thread", because the rpc_list macros only work for a single
204 * "thread"...).  Therefore, the reserved pools each maintain their
205 * own (static) set of cthread_call_queue_elt_t elements.  Reserved pools'
206 * call_queue consists of a set of call_queue_elts that point to the
207 * associated call_reps (which are on the default pool queue).  The
208 * call_queue_elts are maintained on the pool's pool.free_queue when
209 * not in use on the pool.call_queue.
210 *
211 * Startup / Shutdown processing...  We have a requirement that
212 * rpc_server_listen() can be called, return and then be called again
213 * (a startup, shutdown, restart sequence).  All call threads should
214 * be terminated by a shutdown (to free up resources).  All threads
215 * (including those requested by previous calls to
216 * rpc_server_create_thread_pool()) must be automatically recreated upon
217 * a restart.
218 *
219 * Pool creation is two step process.  First, a pool descriptor is
220 * allocated (cthread_pool_alloc()).  Subsequently, a pool may be "started"
221 * (cthread_pool_start()); this actually creates the call threads.
222 *
223 * Shutting down involves stopping all of the call threads in all of
224 * the pools (including freeing each pool's call thread table).  The
225 * pool descriptors are not freed.  This is necessary to retain information
226 * that is needed to restart the server.
227 */
228
229typedef struct cthread_pool_elt_t {
230    rpc_list_t      link;           /* linked list of pools */
231    unsigned16      n_threads;      /* total number of threads in the pool */
232    unsigned16      n_idle;         /* number of idle (available) threads */
233    cthread_elt_p_t ctbl;           /* the cthreads associated with the pool */
234    cthread_elt_p_t idle_cthread;   /* pointer to a known idle cthread */
235    unsigned32      n_queued;       /* see above! */
236    unsigned32      max_queued;     /* see above! */
237    rpc_list_t      call_queue;     /* see above!; list of calls queued */
238    rpc_list_t      free_queue;     /* see above!; list of free call_queue elements */
239    unsigned        stop : 1;       /* T => pool's threads stop when complete */
240    unsigned        queue_elt_alloc : 1;   /* T => start() should allocate queue elts */
241} cthread_pool_elt_t, *cthread_pool_elt_p_t;
242
243
244/*
245 * Pools are only associated with the MAJOR version of an interface.
246 */
247
248#define IF_VERS_MAJOR(_vers) ((_vers) & 0xffff)
249#define IF_VERS_MINOR(_vers) (((_vers) >> 16) & 0xffff)
250
251/*
252 * A couple of macros for convienience.
253 */
254
255#define CTHREAD_POOL_IS_QUEUE_FULL(p)   ((p)->n_queued >= (p)->max_queued)
256#define CTHREAD_POOL_IS_QUEUE_EMPTY(p)  (RPC_LIST_EMPTY ((p)->call_queue))
257
258/*
259 * A couple of macros for (fast path) performance.
260 */
261
262#define CTHREAD_POOL_LOOKUP_RESERVED(object, if_uuid, if_ver, if_opnum, p, st) \
263    { \
264        RPC_MUTEX_LOCK_ASSERT (cthread_mutex); \
265        if (cthread_pool_lookup_fn == NULL) \
266        { \
267            *(p) = NULL; \
268            *st = 0; \
269        } \
270        else \
271        { \
272            rpc_if_id_t if_id; \
273            if_id.uuid = *(if_uuid); \
274            if_id.vers_major = IF_VERS_MAJOR(if_ver); \
275            if_id.vers_minor = IF_VERS_MINOR(if_ver); \
276            (*cthread_pool_lookup_fn) (\
277                    object, &if_id, if_opnum, \
278                    (rpc_thread_pool_handle_t *)p, st); \
279        } \
280    }
281
282#define CTHREAD_POOL_ASSIGN_THREAD(p, ct) \
283    { \
284        RPC_MUTEX_LOCK_ASSERT (cthread_mutex); \
285        if ((p)->idle_cthread != NULL) \
286        { \
287            *(ct) = (p)->idle_cthread; \
288            (p)->idle_cthread = NULL; \
289            assert((*(ct))->thread_state == RPC_C_IDLE_CTHREAD); \
290            (*(ct))->thread_state = RPC_C_ACTIVE_CTHREAD; \
291            (p)->n_idle--; \
292        } \
293        else \
294        { \
295            *(ct) = cthread_pool_assign_thread(p); \
296        } \
297    }
298
299#define CTHREAD_POOL_IDLE_THREAD(p, ct) \
300    { \
301        (p)->n_idle++; \
302        (p)->idle_cthread = ct; \
303    }
304
305
306
307/*
308 * The reserved pools.
309 *
310 * The pools are linked together via their pool.pool_list field.
311 */
312INTERNAL rpc_list_t             cthread_reserved_pools;
313
314/*
315 * A handle to the special default pool.
316 */
317INTERNAL cthread_pool_elt_p_t   cthread_default_pool;
318
319/*
320 * The maximum number of calls that will be queued for the default
321 * thread pool.  This value is settable via the rpc_server_set_thread_pool_qlen
322 * function.  If not set, a default value of 8 times the number of
323 * default pool threads is used.
324 */
325INTERNAL unsigned32 cthread_default_call_queue_size;
326
327/*
328 * The "reaper's" pool queue and timer.
329 *
330 * The pools are linked together via their pool.pool_list field.
331 */
332INTERNAL rpc_list_t             cthread_reaper_queue;
333INTERNAL rpc_timer_t            cthread_reaper_timer;
334
335#ifndef RPC_C_CTHREAD_REAPER_FREQ
336#  define RPC_C_CTHREAD_REAPER_FREQ     RPC_CLOCK_SEC(3*60)
337#endif
338
339/*
340 * cthread_mutex protects all of the cthread private structures.
341 */
342INTERNAL rpc_mutex_t            cthread_mutex;
343
344/*
345 * A global that controls the overall ability of RPCs to be assigned
346 * to a pool/thread for execution (i.e. it controls rpc__cthread_invoke_null).
347 */
348INTERNAL boolean                cthread_invoke_enabled;
349
350/*
351 * A global that points to the application specified thread pool lookup function.
352 */
353INTERNAL rpc_thread_pool_fn_t   cthread_pool_lookup_fn;
354
355
356INTERNAL void cthread_create (
357        cthread_elt_p_t          /*cthread*/,
358        unsigned32              * /*status*/
359    );
360
361INTERNAL void cthread_call_executor (
362        cthread_elt_p_t        /*cthread*/
363    );
364
365INTERNAL void cthread_reaper (
366        dce_pointer_t    /*arg*/
367    );
368
369INTERNAL cthread_pool_elt_p_t cthread_pool_alloc (
370        unsigned32   /*n_threads*/,
371        boolean32    /*is_default_pool*/,
372        unsigned32  * /*status*/
373    );
374
375INTERNAL void cthread_pool_set_threadcnt (
376        cthread_pool_elt_p_t  /*p*/,
377        unsigned32   /*n_threads*/,
378        unsigned32  * /*status*/
379    );
380
381INTERNAL void cthread_pool_free (
382        cthread_pool_elt_p_t  /*p*/,
383        unsigned32  * /*status*/
384    );
385
386INTERNAL void cthread_pool_start (
387        cthread_pool_elt_p_t  /*p*/,
388        unsigned32  * /*status*/
389    );
390
391INTERNAL void cthread_pool_stop (
392        cthread_pool_elt_p_t  /*p*/,
393        unsigned32  /*wait_flag*/,
394        unsigned32  * /*status*/
395    );
396
397INTERNAL cthread_elt_p_t cthread_pool_assign_thread (
398        cthread_pool_elt_p_t     /*p*/
399    );
400
401INTERNAL void cthread_pool_queue_call (
402        cthread_pool_elt_p_t     /*p*/,
403        rpc_call_rep_p_t         /*call_rep*/,
404        unsigned32              * /*status*/
405    );
406
407INTERNAL rpc_call_rep_p_t cthread_pool_dequeue_first (
408        cthread_pool_elt_p_t     /*p*/
409    );
410
411INTERNAL boolean32 cthread_call_dequeue (
412        rpc_call_rep_p_t         /*call_rep*/
413    );
414
415
416/*
417**++
418**
419**  ROUTINE NAME:       cthread_create
420**
421**  SCOPE:              INTERNAL
422**
423**  DESCRIPTION:
424**
425**  Create a call thread and initialize the table entry.
426**
427**  INPUTS:
428**
429**      cthread         The cthread_table entry to use.
430**
431**  INPUTS/OUTPUTS:     none
432**
433**  OUTPUTS:
434**
435**      status          A value indicating the status of the routine.
436**
437**          rpc_s_ok
438**
439**  IMPLICIT INPUTS:    none
440**
441**  IMPLICIT OUTPUTS:   none
442**
443**  FUNCTION VALUE:     none
444**
445**  SIDE EFFECTS:       none
446**
447**--
448**/
449
450INTERNAL void cthread_create
451(
452    cthread_elt_p_t volatile cthread,
453    unsigned32              *status
454)
455{
456    dcethread*               handle_copy;
457
458    RPC_MUTEX_LOCK_ASSERT (cthread_mutex);
459
460    CODING_ERROR (status);
461
462    /*
463     * Create a thread for this entry, passing it a pointer its
464     * call thread table entry.  Detach the thread since no one
465     * ever joins with the thread (and we don't want it to become
466     * forever zombie'd when it terminates).
467     */
468    DCETHREAD_TRY {
469        dcethread_create_throw (&cthread->thread_id,
470                    &rpc_g_server_dcethread_attr,
471                    (dcethread_startroutine)cthread_call_executor,
472                    (dcethread_addr)cthread);
473
474        cthread->thread_state = RPC_C_IDLE_CTHREAD;
475
476        handle_copy = cthread->thread_id;
477        dcethread_detach_throw(handle_copy);
478
479        *status = rpc_s_ok;
480    } DCETHREAD_CATCH_ALL(THIS_CATCH) {
481        *status = rpc_s_cthread_create_failed;
482		  /* FIXME MNE */
483		  fprintf(stderr, "XXX MIREK: %s: %s: %d: cthread creation failure\n",
484				  __FILE__, __PRETTY_FUNCTION__, __LINE__);
485    } DCETHREAD_ENDTRY
486
487    return;
488}
489
490/*
491**++
492**
493**  ROUTINE NAME:       cthread_call_executor
494**
495**  SCOPE:              INTERNAL
496**
497**  DESCRIPTION:
498**
499**  The base routine of all call executor threads.  Loop awaiting
500**  and processing calls until told to stop.
501**
502**  INPUTS:
503**
504**      cthread         Pointer to the thread's call table element
505**
506**  INPUTS/OUTPUTS:     none
507**
508**  OUTPUTS:            none
509**
510**  IMPLICIT INPUTS:    none
511**
512**  IMPLICIT OUTPUTS:   none
513**
514**  FUNCTION VALUE:     none
515**
516**  SIDE EFFECTS:       none
517**
518**--
519**/
520
521INTERNAL void cthread_call_executor
522(
523  cthread_elt_p_t cthread
524)
525{
526    rpc_call_rep_t          *call_rep = NULL;
527    rpc_cthread_pvt_info_p_t pvt = NULL;
528    cthread_pool_elt_p_t    p = cthread->pool;
529    boolean                 skip_startup = true;
530
531    /*
532     * Call executors execute with general cancelability disabled
533     * until the stub dispatched to the manager.  This prevents the
534     * call executor from having a pending cancel delivered to it before
535     * the manager is called.
536     */
537    dcethread_enableinterrupt_throw(0);
538
539    RPC_MUTEX_LOCK (cthread_mutex);
540
541    if (CTHREAD_POOL_IS_QUEUE_EMPTY(p))
542    {
543	skip_startup = false;
544    }
545
546    /*
547     * Loop executing calls until we're told to exit.
548     */
549    while (true)
550    {
551        boolean run_queued_call = false;
552
553	if (!skip_startup)
554	{
555	    /*
556	     * Update the pool's idle thread info.
557	     */
558	    CTHREAD_POOL_IDLE_THREAD(p, cthread);
559
560	    /*
561	     * Wait for a call assignment (or until we're told to exit).
562	     */
563	    while (cthread->thread_state == RPC_C_IDLE_CTHREAD && ! p->stop)
564	    {
565		RPC_COND_WAIT (cthread->thread_cond, cthread_mutex);
566	    }
567
568	    /*
569	     * If we've been told to stop, then do so.
570	     */
571	    if (p->stop)
572	    {
573		break;
574	    }
575
576	    /*
577	     * Setup the call that was assigned to us.
578	     */
579	    call_rep = cthread->call_rep;
580	    assert(call_rep != NULL);
581	    pvt = &call_rep->u.server.cthread;
582	}
583        /*
584         * Execute the call assigned to us, followed by any queued calls.
585         */
586        do
587        {
588	    if (!skip_startup)
589	    {
590		RPC_DBG_PRINTF (rpc_e_dbg_general, 15,
591		    ("(cthread_call_executor) pool %p cthread %p executing call_rep %p\n",
592			p, cthread, call_rep));
593
594		/*
595		 * Unlock the cthread_mutex while the call is executing.
596		 */
597		RPC_MUTEX_UNLOCK (cthread_mutex);
598
599		/*
600		 * Invoke the routine provided when this thread was invoked
601		 * with the argument provided.  The routine is always called
602		 * with general cancelability disabled AND WITH THE CALL LOCKED.
603		 * Since we don't have reference counts (in the common code)
604		 * this call reference and lock is 'handed off' to the routine
605		 * (which is responsible for releasing the lock).  Upon completion
606		 * of the 'routine' we can no longer reference the call (it may
607		 * no longer exist).
608		 */
609
610		RPC_CALL_LOCK(cthread->call_rep);
611		(*(pvt->executor)) (pvt->optargs, run_queued_call);
612
613		/*
614		 * Reacquire the cthread mutex and check for queued calls.
615		 * As the above somment sez; we no longer hold the call lock
616		 * at this point.
617		 */
618
619		RPC_MUTEX_LOCK (cthread_mutex);
620	    }
621            /*
622             * Select the oldest queued call; remove it from its queue(s)
623             * and setup to execute it.
624             */
625
626	    skip_startup = false;
627            if (CTHREAD_POOL_IS_QUEUE_EMPTY(p))
628            {
629                run_queued_call = false;
630                continue;
631            }
632            call_rep = cthread_pool_dequeue_first(p);
633            pvt = &call_rep->u.server.cthread;
634
635            /*
636             * Fill in the thread_h of the protocol specific call
637             * handle for use by the protocol module.
638             */
639            pvt->thread_h = cthread->thread_id;
640
641            /*
642             * Update the cthread table entry for this call just to be
643             * consistent.
644             */
645            cthread->call_rep = call_rep;
646
647            /*
648             * Indicate there's a queued call to process.
649             */
650            run_queued_call = true;
651        } while (run_queued_call);
652
653        /*
654         * Free up this thread to be allocated again.
655         */
656        cthread->thread_state = RPC_C_IDLE_CTHREAD;
657    }
658
659    RPC_DBG_PRINTF (rpc_e_dbg_general, 5,
660        ("(cthread_call_executor) pool %p cthread %p stopped\n",
661        p, cthread));
662
663    /*
664     * Notify others that the cthread is exiting.
665     */
666
667    cthread->thread_state = RPC_C_NO_CTHREAD;
668    RPC_COND_BROADCAST (cthread->thread_cond, cthread_mutex);
669
670    RPC_MUTEX_UNLOCK (cthread_mutex);
671}
672
673
674/*
675**++
676**
677**  ROUTINE NAME:       cthread_reaper
678**
679**  SCOPE:              INTERNAL
680**
681**  DESCRIPTION:
682**
683**  Free pools as they become idle
684**  (this is run periodically from the timer thread).
685**
686**  INPUTS:             none
687**
688**  INPUTS/OUTPUTS:     none
689**
690**  OUTPUTS:            none
691**
692**  IMPLICIT INPUTS:
693**      cthread_reaper_queue    the queue of waiting to be freed pools
694**
695**  IMPLICIT OUTPUTS:   none
696**
697**  FUNCTION VALUE:     none
698**
699**  SIDE EFFECTS:       none
700**
701**--
702**/
703
704INTERNAL void cthread_reaper
705(
706  dce_pointer_t   unused_arg ATTRIBUTE_UNUSED
707)
708{
709    cthread_pool_elt_p_t    p, np;
710    unsigned32              i;
711    unsigned32              st;
712    cthread_elt_p_t         cthread;
713    boolean                 free_pool;
714
715    RPC_MUTEX_LOCK (cthread_mutex);
716
717    /*
718     * Scan the queue looking for (and freeing) idle pools.
719     */
720    RPC_LIST_FIRST(cthread_reaper_queue, p, cthread_pool_elt_p_t);
721    while (p != NULL)
722    {
723        free_pool = true;
724        if (p->ctbl != NULL)
725        {
726            /*
727             * See if all of the pool's threads have completed.
728             */
729            for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++)
730            {
731                if (cthread->thread_state != RPC_C_NO_CTHREAD)
732                {
733                    free_pool = false;
734                    break;
735                }
736            }
737        }
738
739        if (! free_pool)
740        {
741            RPC_LIST_NEXT (p, p, cthread_pool_elt_p_t);
742            continue;
743        }
744
745        RPC_DBG_PRINTF (rpc_e_dbg_general, 5,
746            ("(cthread_reaper) freeing pool %p\n", p));
747
748        /*
749         * Remove the pool from the reaper's queue (pool free really
750         * frees the storage)... but first, determine the "next pool"
751         * so we can continue the scan.
752         */
753
754        RPC_LIST_NEXT (p, np, cthread_pool_elt_p_t);
755        RPC_LIST_REMOVE (cthread_reaper_queue, p);
756
757        /*
758         * Free up the pool's descriptor.
759         */
760        cthread_pool_free(p, &st);
761
762        /*
763         * Continue scanning with the next on the list.
764         */
765        p = np;
766    }
767
768    /*
769     * Shutdown the reaper timer when there's nothing to reap.
770     */
771    if (RPC_LIST_EMPTY(cthread_reaper_queue))
772        rpc__timer_clear(&cthread_reaper_timer);
773
774    RPC_MUTEX_UNLOCK (cthread_mutex);
775}
776
777/*
778**++
779**
780**  ROUTINE NAME:       cthread_pool_alloc
781**
782**  SCOPE:              INTERNAL
783**
784**  DESCRIPTION:
785**
786**  Allocate the resources for a pool (cthread_pool_start() actually creates the
787**  threads).
788**
789**  INPUTS:
790**
791**      n_threads       number of call threads in the pool
792**
793**  INPUTS/OUTPUTS:     none
794**
795**  OUTPUTS:
796**
797**      status          A value indicating the status of the routine.
798**
799**          rpc_s_ok
800**
801**  IMPLICIT INPUTS:    none
802**
803**  IMPLICIT OUTPUTS:   none
804**
805**  FUNCTION VALUE:
806**
807**      p               the created pool
808**
809**  SIDE EFFECTS:       none
810**
811**--
812**/
813
814INTERNAL cthread_pool_elt_p_t cthread_pool_alloc
815(
816    unsigned32  n_threads,
817    boolean32   is_default_pool,
818    unsigned32  *status
819)
820{
821    cthread_pool_elt_p_t    p = NULL;
822
823    CODING_ERROR (status);
824
825    RPC_MUTEX_LOCK_ASSERT (cthread_mutex);
826
827    /*
828     * Check for the sanity of the number of threads.
829     */
830    if (n_threads == 0)
831    {
832        *status = rpc_s_max_calls_too_small;
833        return p;
834    }
835
836    /*
837     * Alloc a pool descriptor.
838     */
839    RPC_MEM_ALLOC (p,
840                   cthread_pool_elt_p_t,
841                   sizeof (cthread_pool_elt_t),
842                   RPC_C_MEM_CTHREAD_POOL,
843                   RPC_C_MEM_WAITOK);
844
845    if (p == NULL)
846    {
847        *status = rpc_s_no_memory;
848        return p;
849    }
850
851    /*
852     * Init the fields in the pool descriptor.
853     */
854    RPC_LIST_INIT (p->link);
855    p->n_threads    = n_threads;
856    p->n_idle       = 0;
857    p->ctbl         = NULL;
858    p->idle_cthread = NULL;
859    p->n_queued     = 0;
860
861    /*
862     * If the application has indicated a preference for the call queue depth
863     * of the default pool, use that.  Otherwise, default to 8 times the number
864     * of threads in the pool.
865     */
866    if (is_default_pool && cthread_default_call_queue_size != 0)
867        p->max_queued   = cthread_default_call_queue_size;
868    else
869        p->max_queued   = RPC_C_CTHREAD_QUEUE_MULTIPLIER * n_threads;
870
871    RPC_LIST_INIT (p->call_queue);
872    RPC_LIST_INIT (p->free_queue);
873    p->stop         = false;
874    p->queue_elt_alloc = ! is_default_pool;
875
876    *status = rpc_s_ok;
877
878/*
879CLEANUP:
880*/
881    if (*status != rpc_s_ok)
882    {
883        if (p != NULL)
884            RPC_MEM_FREE (p, RPC_C_MEM_CTHREAD_POOL);
885        p = NULL;
886    }
887
888    return p;
889}
890
891/*
892**++
893**
894**  ROUTINE NAME:       cthread_pool_set_threadcnt
895**
896**  SCOPE:              INTERNAL
897**
898**  DESCRIPTION:
899**
900**  Modify the number of threads associated with the pool
901**  This is not intended to generically work; this is only
902**  suppose to work on "idle" pools (alloc'ed but not started,
903**  or started and then stopped).
904**
905**  INPUTS:
906**
907**      p               the pool who's count to modify
908**      n_threads       the new number of threads
909**
910**  INPUTS/OUTPUTS:     none
911**
912**  OUTPUTS:
913**
914**      status          A value indicating the status of the routine.
915**
916**          rpc_s_ok
917**
918**  IMPLICIT INPUTS:    none
919**
920**  IMPLICIT OUTPUTS:   none
921**
922**  FUNCTION VALUE:     none
923**
924**  SIDE EFFECTS:       none
925**
926**--
927**/
928
929INTERNAL void cthread_pool_set_threadcnt
930(
931    cthread_pool_elt_p_t p,
932    unsigned32  n_threads,
933    unsigned32  *status
934)
935{
936    CODING_ERROR (status);
937
938    RPC_MUTEX_LOCK_ASSERT (cthread_mutex);
939
940    /*
941     * Check for the sanity of the number of threads.
942     */
943    if (n_threads == 0)
944    {
945        *status = rpc_s_max_calls_too_small;
946        return;
947    }
948
949    p->n_threads    = n_threads;
950
951    /*
952     * Use a default call queue size if we're operating on a private pool,
953     * or if this is the default pool and the application hasn't previously
954     * specified a default call queue size for the default pool.
955     */
956    if (p != cthread_default_pool || cthread_default_call_queue_size == 0)
957        p->max_queued   = RPC_C_CTHREAD_QUEUE_MULTIPLIER * n_threads;
958
959    *status = rpc_s_ok;
960}
961
962/*
963**++
964**
965**  ROUTINE NAME:       cthread_pool_free
966**
967**  SCOPE:              INTERNAL
968**
969**  DESCRIPTION:
970**
971**  Free the (assumed idle) pool's resources.
972**
973**  INPUTS:
974**
975**      p               the pool to free
976**
977**  INPUTS/OUTPUTS:     none
978**
979**  OUTPUTS:
980**
981**      status          A value indicating the status of the routine.
982**
983**          rpc_s_ok
984**
985**  IMPLICIT INPUTS:    none
986**
987**  IMPLICIT OUTPUTS:   none
988**
989**  FUNCTION VALUE:     none
990**
991**  SIDE EFFECTS:       none
992**
993**--
994**/
995
996INTERNAL void cthread_pool_free
997(
998    cthread_pool_elt_p_t p,
999    unsigned32  *status
1000)
1001{
1002    unsigned32              i;
1003    cthread_elt_p_t         cthread;
1004
1005    CODING_ERROR (status);
1006
1007    RPC_MUTEX_LOCK_ASSERT (cthread_mutex);
1008
1009    /*
1010     * The assumption is that the pool is idle (all of its threads
1011     * have terminated).
1012     */
1013
1014    /*
1015     * Clean up and free the ctbl.  If there is a ctbl, the assumption
1016     * is that all of the ctable's entries have valid initialized cv's.
1017     */
1018    if (p->ctbl)
1019    {
1020        for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++)
1021        {
1022            RPC_COND_DELETE (cthread->thread_cond, cthread_mutex);
1023        }
1024        RPC_MEM_FREE (p->ctbl, RPC_C_MEM_CTHREAD_CTBL);
1025        p->ctbl = NULL;
1026    }
1027
1028    /*
1029     * Free up the queue elt table.
1030     */
1031    while (! RPC_LIST_EMPTY(p->free_queue))
1032    {
1033        cthread_queue_elt_p_t qe;
1034
1035        RPC_LIST_REMOVE_TAIL(p->free_queue, qe, cthread_queue_elt_p_t);
1036        RPC_MEM_FREE (qe, RPC_C_MEM_CTHREAD_QETBL);
1037    }
1038
1039    /*
1040     * Free the pool descriptor.
1041     */
1042    RPC_MEM_FREE (p, RPC_C_MEM_CTHREAD_POOL);
1043
1044    *status = rpc_s_ok;
1045}
1046
1047/*
1048**++
1049**
1050**  ROUTINE NAME:       cthread_pool_start
1051**
1052**  SCOPE:              INTERNAL
1053**
1054**  DESCRIPTION:
1055**
1056**  Start up the call execution threads for an existing pool.
1057**
1058**  INPUTS:
1059**
1060**      p               the pool to start
1061**      n_threads       number of call threads in the pool
1062**
1063**  INPUTS/OUTPUTS:     none
1064**
1065**  OUTPUTS:
1066**
1067**      status          A value indicating the status of the routine.
1068**
1069**          rpc_s_ok
1070**
1071**  IMPLICIT INPUTS:    none
1072**
1073**  IMPLICIT OUTPUTS:   none
1074**
1075**  FUNCTION VALUE:     none
1076**
1077**  SIDE EFFECTS:       none
1078**
1079**--
1080**/
1081
1082INTERNAL void cthread_pool_start
1083(
1084    cthread_pool_elt_p_t p,
1085    unsigned32  *status
1086)
1087{
1088    unsigned32              i;
1089    cthread_elt_p_t         cthread;
1090
1091    CODING_ERROR (status);
1092
1093    RPC_MUTEX_LOCK_ASSERT (cthread_mutex);
1094
1095    /*
1096     * The pool should not currently have any actual call threads.
1097     */
1098    if (p->ctbl != NULL)
1099    {
1100        RPC_DBG_GPRINTF (
1101            ("(cthread_pool_start) pool %p orphaning ctbl\n", p));
1102    }
1103
1104    /*
1105     * Allocate the pool's call thread table.
1106     */
1107    RPC_MEM_ALLOC (p->ctbl,
1108                   cthread_elt_p_t,
1109                   p->n_threads * (sizeof (cthread_elt_t)),
1110                   RPC_C_MEM_CTHREAD_CTBL,
1111                   RPC_C_MEM_WAITOK);
1112
1113    if (p->ctbl == NULL)
1114    {
1115        *status = rpc_s_no_memory;
1116        return;
1117    }
1118
1119    /*
1120     * Init the pool's cthread table / create the cthreads.
1121     * Do this in two phases to ensure that the table is
1122     * sane in the event that thread creation fails and cleanup
1123     * is necessary.
1124     */
1125
1126    for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++)
1127    {
1128        cthread->pool = p;
1129        cthread->thread_state = RPC_C_NO_CTHREAD;
1130        RPC_COND_INIT (cthread->thread_cond, cthread_mutex);
1131    }
1132
1133    for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++)
1134    {
1135        cthread_create(cthread, status);
1136        if (*status != rpc_s_ok)
1137        {
1138            RPC_DBG_GPRINTF (
1139                ("(cthread_pool_start) pool %p couldn't create thread %d\n", p, i));
1140            goto CLEANUP;
1141        }
1142    }
1143
1144    /*
1145     * Setup additional fields in the pool descriptor.
1146     */
1147    p->n_idle       = 0;
1148    p->idle_cthread = NULL;
1149    p->n_queued     = 0;
1150    RPC_LIST_INIT (p->call_queue);
1151    RPC_LIST_INIT (p->free_queue);
1152
1153    /*
1154     * Allocate the pool's queue elements if necessary.
1155     */
1156    if (p->queue_elt_alloc)
1157    {
1158        for (i = 0; i < p->max_queued; i++)
1159        {
1160            cthread_queue_elt_p_t qe;
1161
1162            RPC_MEM_ALLOC (qe,
1163                           cthread_queue_elt_p_t,
1164                           sizeof (cthread_queue_elt_t),
1165                           RPC_C_MEM_CTHREAD_QETBL,
1166                           RPC_C_MEM_WAITOK);
1167
1168            if (qe == NULL)
1169            {
1170                *status = rpc_s_no_memory;
1171                goto CLEANUP;
1172            }
1173
1174            qe->pool = p;
1175            RPC_LIST_ADD_TAIL (p->free_queue, qe, cthread_queue_elt_p_t);
1176        }
1177    }
1178
1179    RPC_DBG_PRINTF (rpc_e_dbg_general, 5,
1180        ("(cthread_pool_start) pool %p (%d threads)\n", p, p->n_threads));
1181
1182    /*
1183     * Tell the pool's threads to start.
1184     */
1185    p->stop         = false;
1186
1187    *status = rpc_s_ok;
1188
1189CLEANUP:
1190
1191    if (*status != rpc_s_ok)
1192    {
1193        unsigned32  st;
1194
1195        if (p->ctbl != NULL)
1196        {
1197            cthread_pool_stop(p, true /* wait */, &st);
1198            p->ctbl = NULL;
1199        }
1200
1201        while (! RPC_LIST_EMPTY(p->free_queue))
1202        {
1203            cthread_queue_elt_p_t qe;
1204
1205            RPC_LIST_REMOVE_TAIL(p->free_queue, qe, cthread_queue_elt_p_t);
1206            RPC_MEM_FREE (qe, RPC_C_MEM_CTHREAD_QETBL);
1207        }
1208    }
1209}
1210
1211/*
1212**++
1213**
1214**  ROUTINE NAME:       cthread_pool_stop
1215**
1216**  SCOPE:              INTERNAL
1217**
1218**  DESCRIPTION:
1219**
1220**  Stop the pool's call threadas.
1221**
1222**  INPUTS:
1223**
1224**      p               the pool to stop
1225**      wait_flag       T => wait for threads to stop
1226**
1227**  INPUTS/OUTPUTS:     none
1228**
1229**  OUTPUTS:
1230**
1231**      status          A value indicating the status of the routine.
1232**
1233**          rpc_s_ok
1234**
1235**  IMPLICIT INPUTS:    none
1236**
1237**  IMPLICIT OUTPUTS:   none
1238**
1239**  FUNCTION VALUE:     none
1240**
1241**  SIDE EFFECTS:       none
1242**
1243**--
1244**/
1245
1246INTERNAL void cthread_pool_stop
1247(
1248    cthread_pool_elt_p_t p,
1249    unsigned32 wait_flag,
1250    unsigned32  *status
1251)
1252{
1253    int                     cs;
1254    unsigned32              i;
1255    cthread_elt_p_t         cthread;
1256
1257    CODING_ERROR (status);
1258
1259    RPC_MUTEX_LOCK_ASSERT (cthread_mutex);
1260
1261    /*
1262     * If there are no threads associated with the pool, we're done.
1263     */
1264    if (p->ctbl == NULL)
1265    {
1266        *status = rpc_s_ok;
1267        return;
1268    }
1269
1270    RPC_DBG_PRINTF (rpc_e_dbg_general, 5,
1271            ("(cthread_pool_stop) pool %p (%d threads) stopping\n",
1272            p, p->n_threads));
1273
1274    /*
1275     * Tell the threads to stop when they complete the current activities.
1276     */
1277    p->stop = true;
1278
1279    /*
1280     * Unblock any waiting call threads so they detect the 'stop' condition.
1281     */
1282    for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++)
1283    {
1284        if (cthread->thread_state != RPC_C_NO_CTHREAD)
1285        {
1286            RPC_COND_SIGNAL (cthread->thread_cond, cthread_mutex);
1287        }
1288    }
1289
1290    /*
1291     * If not waiting, we're done.
1292     */
1293    if (!wait_flag)
1294    {
1295        *status = rpc_s_ok;
1296        return;
1297    }
1298
1299    /*
1300     * Disable cancel delivery while awaiting cthread termination.  This
1301     * ensures completion and preservation of invariants.  If it becomes
1302     * necessary, we can allow cancels and setup a cleanup handler and
1303     * in the event of a cancel, queue the pool to the reaper for final
1304     * cleanup.
1305     */
1306    cs = dcethread_enableinterrupt_throw (0);
1307
1308    /*
1309     * Wait for all call threads to complete.
1310     *
1311     * We wait on the call thread's private cv; the cthread signals its
1312     * cv prior to exiting.  While dcethread_join() would have done the
1313     * trick; this scheme works just as well and is portable to environments
1314     * that may have difficulty implementing join (i.e. for Kernel RPC).
1315     */
1316    for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++)
1317    {
1318        while (cthread->thread_state != RPC_C_NO_CTHREAD)
1319        {
1320            RPC_COND_WAIT (cthread->thread_cond, cthread_mutex);
1321        }
1322    }
1323
1324    /*
1325     * Restore the cancel state.
1326     */
1327    dcethread_enableinterrupt_throw (cs);
1328
1329    RPC_DBG_PRINTF (rpc_e_dbg_general, 5,
1330            ("(cthread_pool_stop) pool %p (%d threads) stopped\n",
1331            p, p->n_threads));
1332
1333    /*
1334     * Clean up and free the ctbl.  If there is a ctbl, the assumption
1335     * is that all of the ctable's entries have valid initialized cv's.
1336     */
1337    for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++)
1338    {
1339        RPC_COND_DELETE (cthread->thread_cond, cthread_mutex);
1340    }
1341    RPC_MEM_FREE (p->ctbl, RPC_C_MEM_CTHREAD_CTBL);
1342    p->ctbl = NULL;
1343
1344    /*
1345     * Free up the queue elt list.
1346     */
1347    while (! RPC_LIST_EMPTY(p->free_queue))
1348    {
1349        cthread_queue_elt_p_t qe;
1350
1351        RPC_LIST_REMOVE_TAIL(p->free_queue, qe, cthread_queue_elt_p_t);
1352        RPC_MEM_FREE (qe, RPC_C_MEM_CTHREAD_QETBL);
1353    }
1354
1355    *status = rpc_s_ok;
1356}
1357
1358/*
1359**++
1360**
1361**  ROUTINE NAME:       cthread_pool_assign_thread
1362**
1363**  SCOPE:              INTERNAL
1364**
1365**  DESCRIPTION:
1366**
1367**  Locate an idle thread in the indicated pool.
1368**
1369**  INPUTS:
1370**
1371**      p               the pool to search
1372**
1373**  INPUTS/OUTPUTS:     none
1374**
1375**  OUTPUTS:
1376**
1377**      status          A value indicating the status of the routine.
1378**
1379**          rpc_s_ok
1380**
1381**  IMPLICIT INPUTS:    none
1382**
1383**  IMPLICIT OUTPUTS:   none
1384**
1385**  FUNCTION VALUE:
1386**
1387**      cthread         the assigned thread (NULL if none found)
1388**
1389**  SIDE EFFECTS:       none
1390**
1391**--
1392**/
1393
1394INTERNAL cthread_elt_p_t cthread_pool_assign_thread
1395(
1396    cthread_pool_elt_p_t    p
1397)
1398{
1399    cthread_elt_p_t cthread = NULL;
1400
1401    RPC_MUTEX_LOCK_ASSERT (cthread_mutex);
1402
1403    /*
1404     * Locate an idle call thread (if one exists).
1405     */
1406    if (p->n_idle > 0)
1407    {
1408        if (p->idle_cthread != NULL)
1409        {
1410            cthread = p->idle_cthread;
1411            assert(cthread->thread_state == RPC_C_IDLE_CTHREAD);
1412            p->idle_cthread = NULL;
1413        }
1414        else
1415        {
1416            cthread_elt_p_t ct;
1417
1418            for (ct = p->ctbl; ct < &p->ctbl[p->n_threads]; ct++)
1419            {
1420                if (ct->thread_state == RPC_C_IDLE_CTHREAD)
1421                {
1422                    cthread = ct;
1423                    break;
1424                }
1425            }
1426        }
1427    }
1428
1429    if (cthread != NULL)
1430    {
1431        cthread->thread_state = RPC_C_ACTIVE_CTHREAD;
1432        p->n_idle--;
1433    }
1434
1435    return cthread;
1436}
1437
1438/*
1439**++
1440**
1441**  ROUTINE NAME:       cthread_pool_queue_call
1442**
1443**  SCOPE:              INTERNAL
1444**
1445**  DESCRIPTION:
1446**
1447**  Attempt to queue a call for deferred execution.
1448**
1449**  INPUTS:
1450**
1451**      p               the call's pool
1452**      call_rep        the call
1453**
1454**  INPUTS/OUTPUTS:     none
1455**
1456**  OUTPUTS:
1457**
1458**      status          A value indicating the status of the routine.
1459**
1460**          rpc_s_ok
1461**          rpc_s_cthread_not_found
1462**
1463**  IMPLICIT INPUTS:    none
1464**
1465**  IMPLICIT OUTPUTS:   none
1466**
1467**  FUNCTION VALUE:     none
1468**
1469**  SIDE EFFECTS:       none
1470**
1471**--
1472**/
1473
1474INTERNAL void cthread_pool_queue_call
1475(
1476    cthread_pool_elt_p_t    p,
1477    rpc_call_rep_p_t        call_rep,
1478    unsigned32              *status
1479)
1480{
1481    rpc_cthread_pvt_info_p_t    pvt = &call_rep->u.server.cthread;
1482    boolean                     is_default_pool = (p == cthread_default_pool);
1483
1484    CODING_ERROR (status);
1485
1486    RPC_MUTEX_LOCK_ASSERT (cthread_mutex);
1487
1488    /*
1489     * If the queue is full, we're done.
1490     */
1491    if (CTHREAD_POOL_IS_QUEUE_FULL (p))
1492    {
1493        RPC_DBG_GPRINTF ((
1494            "(cthread_pool_queue_call) pool %p full call_rep %p\n", p, call_rep));
1495        *status = rpc_s_cthread_not_found;
1496        return;
1497    }
1498
1499    /*
1500     * Indicate that the call is queued.
1501     */
1502    pvt->is_queued = true;
1503
1504    /*
1505     * Always add the call to the default pool's queue.
1506     *
1507     * ONLY Update the default pool's n_queued if the call is for the
1508     * default pool (see the cthread_pool_elt description comments above)!
1509     */
1510    RPC_LIST_ADD_TAIL (cthread_default_pool->call_queue,
1511                        call_rep, rpc_call_rep_p_t);
1512    if (is_default_pool)
1513    {
1514        pvt->qelt = NULL;
1515        p->n_queued++;
1516    }
1517
1518    /*
1519     * If it's a reserved pool, add it to its queue too.
1520     */
1521    if (! is_default_pool)
1522    {
1523        cthread_queue_elt_p_t qelt;
1524
1525        RPC_LIST_REMOVE_HEAD(p->free_queue, qelt, cthread_queue_elt_p_t);
1526        assert (qelt != NULL);
1527
1528        qelt->call_rep = call_rep;
1529        pvt->qelt = (dce_pointer_t)qelt;
1530
1531        RPC_LIST_ADD_TAIL (p->call_queue, qelt, cthread_queue_elt_p_t);
1532        p->n_queued++;
1533    }
1534
1535    RPC_DBG_PRINTF (rpc_e_dbg_general, 5,
1536        ("(cthread_pool_queue_call) pool %p (now %d) call_rep %p\n",
1537            p, p->n_queued, call_rep));
1538
1539    *status = rpc_s_ok;
1540}
1541
1542/*
1543**++
1544**
1545**  ROUTINE NAME:       cthread_pool_dequeue_first
1546**
1547**  SCOPE:              INTERNAL
1548**
1549**  DESCRIPTION:        Remove the first queued call rep from a pool.
1550**
1551**  INPUTS:             none
1552**
1553**  INPUTS/OUTPUTS:
1554**
1555**      p               The pool of interest
1556**
1557**  OUTPUTS:            none
1558**
1559**  IMPLICIT INPUTS:    none
1560**
1561**  IMPLICIT OUTPUTS:   none
1562**
1563**  FUNCTION VALUE:
1564**
1565**      call_rep        The dequeued call rep (may be NULL).
1566**
1567**  SIDE EFFECTS:       none
1568**
1569**--
1570**/
1571
1572INTERNAL rpc_call_rep_p_t cthread_pool_dequeue_first
1573(
1574    cthread_pool_elt_p_t    p
1575)
1576{
1577    rpc_call_rep_p_t            call_rep;
1578    boolean                     is_default_pool = (p == cthread_default_pool);
1579
1580    RPC_MUTEX_LOCK_ASSERT (cthread_mutex);
1581
1582    /*
1583     * If the queue is empty we're done.
1584     */
1585    if (CTHREAD_POOL_IS_QUEUE_EMPTY(p))
1586    {
1587        return NULL;
1588    }
1589
1590    /*
1591     * Determine the call rep of interest and then dequeue it.
1592     */
1593    if (is_default_pool)
1594    {
1595        /*
1596         * The default pool's queue is the queue of call reps.
1597         */
1598        RPC_LIST_FIRST (p->call_queue,
1599                          call_rep,
1600                          rpc_call_rep_p_t);
1601    }
1602    else
1603    {
1604        cthread_queue_elt_p_t       qelt;
1605
1606        /*
1607         * The call was really for a reserved pool; determine the
1608         * call rep via the indirection queue elt.
1609         */
1610        RPC_LIST_FIRST (p->call_queue,
1611                          qelt,
1612                          cthread_queue_elt_p_t);
1613
1614        call_rep = qelt->call_rep;
1615        assert ((cthread_queue_elt_p_t)call_rep->u.server.cthread.qelt == qelt);
1616    }
1617
1618    (void) cthread_call_dequeue (call_rep);
1619
1620    return call_rep;
1621}
1622
1623/*
1624**++
1625**
1626**  ROUTINE NAME:       cthread_call_dequeue
1627**
1628**  SCOPE:              INTERNAL
1629**
1630**  DESCRIPTION:        Remove a call rep from the call executor
1631**                      thread waiting queue, if it's there.
1632**
1633**  INPUTS:             none
1634**
1635**  INPUTS/OUTPUTS:
1636**
1637**      call_rep        The call rep to be dequeued.
1638**
1639**  OUTPUTS:            none
1640**
1641**  IMPLICIT INPUTS:    none
1642**
1643**  IMPLICIT OUTPUTS:   none
1644**
1645**  FUNCTION VALUE:     boolean
1646**
1647**                      T => call was previously queued.
1648**
1649**  SIDE EFFECTS:       none
1650**
1651**--
1652**/
1653
1654INTERNAL boolean32 cthread_call_dequeue
1655(
1656    rpc_call_rep_p_t        call_rep
1657)
1658{
1659    rpc_cthread_pvt_info_p_t    pvt = &call_rep->u.server.cthread;
1660    cthread_queue_elt_p_t       qelt = (cthread_queue_elt_p_t)pvt->qelt;
1661    cthread_pool_elt_p_t        p;
1662
1663    RPC_MUTEX_LOCK_ASSERT (cthread_mutex);
1664
1665    /*
1666     * If call's not queued, were done.
1667     */
1668    if (! pvt->is_queued)
1669    {
1670        return false;
1671    }
1672
1673    /*
1674     * Dequeue the call from the default pool.
1675     */
1676    RPC_LIST_REMOVE (cthread_default_pool->call_queue, call_rep);
1677
1678    /*
1679     * The call may or may not been for the default pool.
1680     */
1681    if (qelt == NULL)
1682    {
1683        /*
1684         * The call was for the default pool; adjust the
1685         * default pool queue count (see the cthread_pool_elt
1686         * description).
1687         */
1688        p = cthread_default_pool;
1689        cthread_default_pool->n_queued--;
1690    }
1691    else
1692    {
1693        /*
1694         * The call was really for a reserved pool;
1695         * remove it from that queue too.
1696         */
1697        p = qelt->pool;
1698
1699        assert (qelt->call_rep == call_rep);
1700        assert ((cthread_queue_elt_p_t)pvt->qelt == qelt);
1701
1702        RPC_LIST_REMOVE (p->call_queue, qelt);
1703        p->n_queued--;
1704
1705        /*
1706         * return the queue elt to its free list.
1707         */
1708        qelt->call_rep = NULL;
1709        RPC_LIST_ADD_HEAD  (p->free_queue,
1710                          qelt,
1711                          cthread_queue_elt_p_t);
1712    }
1713
1714    /*
1715     * The call is no longer queued.
1716     */
1717    pvt->is_queued = false;
1718    pvt->qelt = NULL;
1719
1720    RPC_DBG_PRINTF (rpc_e_dbg_general, 5,
1721        ("(cthread_call_dequeue) pool %p (%d remain) call_rep %p\n",
1722        p, p->n_queued, call_rep));
1723
1724    return true;
1725}
1726
1727/*
1728**++
1729**
1730**  ROUTINE NAME:       rpc__cthread_init
1731**
1732**  SCOPE:              PRIVATE - declared in comcthd.h
1733**
1734**  DESCRIPTION:
1735**
1736**  Initialize the cthread package.
1737**
1738**  INPUTS:             none
1739**
1740**  INPUTS/OUTPUTS:     none
1741**
1742**  OUTPUTS:
1743**
1744**      status          A value indicating the status of the routine.
1745**
1746**          rpc_s_ok
1747**
1748**  IMPLICIT INPUTS:    none
1749**
1750**  IMPLICIT OUTPUTS:   none
1751**
1752**  FUNCTION VALUE:     none
1753**
1754**  SIDE EFFECTS:       none
1755**
1756**--
1757**/
1758
1759PRIVATE void rpc__cthread_init
1760(
1761    unsigned32  *status
1762)
1763{
1764    CODING_ERROR (status);
1765
1766    RPC_MUTEX_INIT (cthread_mutex);
1767
1768    *status = rpc_s_ok;
1769}
1770
1771/*
1772**++
1773**
1774**  ROUTINE NAME:       rpc_server_create_thread_pool
1775**
1776**  SCOPE:              PUBLIC - declared in rpcpvt.idl
1777**
1778**  DESCRIPTION:
1779**
1780**  Allocate the resources for a pool (rpc__cthread_pool_start() actually
1781**  creates the threads).
1782**
1783**  INPUTS:
1784**
1785**      n_threads       number of call threads in the pool
1786**
1787**  INPUTS/OUTPUTS:     none
1788**
1789**  OUTPUTS:
1790**
1791**      phandle         Handle to the new pool.
1792**      status          A value indicating the status of the routine.
1793**
1794**          rpc_s_ok
1795**
1796**  IMPLICIT INPUTS:    none
1797**
1798**  IMPLICIT OUTPUTS:   none
1799**
1800**  FUNCTION VALUE:     none
1801**
1802**  SIDE EFFECTS:       none
1803**
1804**--
1805**/
1806
1807PUBLIC void rpc_server_create_thread_pool
1808(
1809    unsigned32      n_threads,
1810    rpc_thread_pool_handle_t *phandle,
1811    unsigned32      *status
1812)
1813{
1814    cthread_pool_elt_p_t    p;
1815
1816    CODING_ERROR (status);
1817    RPC_VERIFY_INIT ();
1818
1819    *phandle = NULL;
1820
1821    RPC_MUTEX_LOCK (cthread_mutex);
1822
1823    p = cthread_pool_alloc(n_threads, false /* is_default_pool */, status);
1824    if (*status != rpc_s_ok)
1825        goto CLEANUP;
1826
1827    /*
1828     * Make the newly created pool "public".
1829     */
1830    RPC_LIST_ADD_TAIL (cthread_reserved_pools, p, cthread_pool_elt_p_t);
1831    *phandle = (rpc_thread_pool_handle_t) p;
1832
1833    /*
1834     * Normally, reserved pools are started up when the default pool
1835     * gets started, as a consequence of calling rpc_server_listen.
1836     * However, if the default pool has already been started up, then
1837     * start up this reserved pool immediately so that it will be available
1838     * for handling calls.
1839     */
1840    if (cthread_invoke_enabled)
1841        cthread_pool_start (p, status);
1842
1843CLEANUP:
1844
1845    RPC_MUTEX_UNLOCK (cthread_mutex);
1846}
1847
1848/*
1849**++
1850**
1851**  ROUTINE NAME:       rpc_server_free_thread_pool
1852**
1853**  SCOPE:              PUBLIC - declared in rpcpvt.idl
1854**
1855**  DESCRIPTION:
1856**
1857**  Stop the pool's call threads and free the pool resources.
1858**
1859**  INPUTS:
1860**
1861**      phandle         Pool to free
1862**      wait_flag       T => wait for threads to stop
1863**
1864**  INPUTS/OUTPUTS:     none
1865**
1866**  OUTPUTS:
1867**
1868**      status          A value indicating the status of the routine.
1869**
1870**          rpc_s_ok
1871**
1872**  IMPLICIT INPUTS:    none
1873**
1874**  IMPLICIT OUTPUTS:   none
1875**
1876**  FUNCTION VALUE:     none
1877**
1878**  SIDE EFFECTS:       none
1879**
1880**--
1881**/
1882
1883PUBLIC void rpc_server_free_thread_pool
1884(
1885    rpc_thread_pool_handle_t *phandle,
1886    boolean32       wait_flag,
1887    unsigned32      *status
1888)
1889{
1890    cthread_pool_elt_p_t    p = (cthread_pool_elt_p_t) *phandle;
1891
1892    CODING_ERROR (status);
1893    RPC_VERIFY_INIT ();
1894
1895    RPC_MUTEX_LOCK (cthread_mutex);
1896
1897    /*
1898     * Remove the pool from the set of reserved pools.
1899     * For all practical external purposes, the reserved pool
1900     * no longer exists (though its cthreads may be still executing
1901     * their current (and queued) calls.
1902     */
1903    RPC_LIST_REMOVE (cthread_reserved_pools, p);
1904
1905    /*
1906     * Stop the pool's threads (waiting as directed).
1907     */
1908    cthread_pool_stop(p, wait_flag, status);
1909
1910    /*
1911     * If we waited for the pool to become idle we can immediately free it;
1912     * otherwise we've got to queue it for eventual freeing (and start up the
1913     * reaper timer if this is the first item being queued).
1914     */
1915    if (wait_flag || p->ctbl == NULL)
1916    {
1917        unsigned32  st;
1918        cthread_pool_free(p, &st);
1919    }
1920    else
1921    {
1922        if (RPC_LIST_EMPTY(cthread_reaper_queue))
1923        {
1924            rpc__timer_set(&cthread_reaper_timer,
1925                cthread_reaper, NULL, RPC_C_CTHREAD_REAPER_FREQ);
1926        }
1927        RPC_LIST_ADD_TAIL (cthread_reaper_queue, p, cthread_pool_elt_p_t);
1928    }
1929
1930    *phandle = NULL;
1931
1932	 /*
1933CLEANUP:
1934*/
1935    RPC_MUTEX_UNLOCK (cthread_mutex);
1936}
1937
1938/*
1939**++
1940**
1941**  ROUTINE NAME:       rpc_server_set_thread_pool_fn
1942**
1943**  SCOPE:              PUBLIC - declared in rpcpvt.idl
1944**
1945**  DESCRIPTION:
1946**
1947**  [Un]Register a thread pool lookup function with the runtime.
1948**
1949**  INPUTS:
1950**
1951**      pool_fn         the lookup function - may be NULL
1952**
1953**  INPUTS/OUTPUTS:     none
1954**
1955**  OUTPUTS:
1956**
1957**      status          A value indicating the status of the routine.
1958**
1959**          rpc_s_ok
1960**
1961**  IMPLICIT INPUTS:    none
1962**
1963**  IMPLICIT OUTPUTS:   none
1964**
1965**  FUNCTION VALUE:     none
1966**
1967**  SIDE EFFECTS:       none
1968**
1969**--
1970**/
1971
1972PUBLIC void rpc_server_set_thread_pool_fn
1973(
1974    rpc_thread_pool_fn_t pool_fn,
1975    unsigned32      *status
1976)
1977{
1978    CODING_ERROR (status);
1979    RPC_VERIFY_INIT ();
1980
1981    RPC_MUTEX_LOCK (cthread_mutex);
1982
1983    if (pool_fn != NULL && cthread_pool_lookup_fn != NULL)
1984    {
1985        *status = -1; /* !!! already set */
1986        goto CLEANUP;
1987    }
1988
1989    cthread_pool_lookup_fn = pool_fn;   /* be it NULL or otherwise */
1990    *status = rpc_s_ok;
1991
1992CLEANUP:
1993
1994    RPC_MUTEX_UNLOCK (cthread_mutex);
1995}
1996
1997/*
1998**++
1999**
2000**  ROUTINE NAME:       rpc_server_set_thread_qlen
2001**
2002**  SCOPE:              PUBLIC - declared in rpcpvt.idl
2003**
2004**  DESCRIPTION:
2005**
2006**  Adjust the maximum number of queued calls for a specified thread pool.
2007**
2008**  INPUTS:
2009**
2010**      phandle         the pool whose queue size is being adjusted
2011**                      a NULL argument can be used to specify that the
2012**                      the operation should be applied to the default pool.
2013**      queue_size      the new size
2014**
2015**  INPUTS/OUTPUTS:     none
2016**
2017**  OUTPUTS:
2018**
2019**      status          A value indicating the status of the routine.
2020**
2021**          rpc_s_ok
2022**
2023**  IMPLICIT INPUTS:    none
2024**
2025**  IMPLICIT OUTPUTS:   none
2026**
2027**  FUNCTION VALUE:     none
2028**
2029**  SIDE EFFECTS:       none
2030**
2031**--
2032**/
2033
2034PUBLIC void rpc_server_set_thread_pool_qlen
2035(
2036    rpc_thread_pool_handle_t phandle,
2037    unsigned32 queue_size,
2038    unsigned32 * status
2039)
2040{
2041    cthread_pool_elt_p_t pool = (cthread_pool_elt_p_t) phandle;
2042
2043    CODING_ERROR (status);
2044    RPC_VERIFY_INIT ();
2045
2046    RPC_MUTEX_LOCK (cthread_mutex);
2047
2048    *status = rpc_s_ok;
2049
2050    /*
2051     * If the caller sent NULL as the pool parameter, apply the operation
2052     * to the default pool.
2053     */
2054    if (pool == NULL)
2055    {
2056        cthread_default_call_queue_size = queue_size;
2057
2058        /*
2059         * If the default pool hasn't been started yet, we're done; the
2060         * global value will be used when it does get started up.  If the
2061         * default pool *has* been started, just update its max_queued
2062         * value.
2063         */
2064        if (cthread_default_pool != NULL)
2065        {
2066            cthread_default_pool->max_queued = queue_size;
2067        }
2068    }
2069    else
2070    {
2071        unsigned32 i;
2072
2073        /*
2074         * We're operating on a private pool...
2075         *
2076         * If this pool has not been started yet, just record the value for
2077         * the max queue size.  The actual queue element data structure will
2078         * get created when the pool is started.
2079         */
2080        if (RPC_LIST_EMPTY(pool->free_queue))
2081        {
2082            pool->max_queued = queue_size;
2083        }
2084        else
2085        {
2086            /*
2087             * This private pool has already been started.
2088             *
2089             * Considering that calls may currently be queued for this pool, it
2090             * would be extremely tricky, not to mention probably not useful, to
2091             * allow the caller to shrink the call queue length.  Only update the
2092             * queue length if it's being increased.
2093             */
2094
2095            if (queue_size > pool->max_queued)
2096            {
2097                /*
2098                 * Alloc up some more queue elements, and add them to the list.
2099                 */
2100                for (i = pool->max_queued; i < queue_size; i++)
2101                {
2102                    cthread_queue_elt_p_t qe;
2103
2104                    RPC_MEM_ALLOC (qe,
2105                                   cthread_queue_elt_p_t,
2106                                   sizeof (cthread_queue_elt_t),
2107                                   RPC_C_MEM_CTHREAD_QETBL,
2108                                   RPC_C_MEM_WAITOK);
2109
2110                    if (qe == NULL)
2111                    {
2112                        *status = rpc_s_no_memory;
2113
2114                        /*
2115                         * Try to stay calm...
2116                         */
2117                        pool->max_queued = i;
2118
2119                        RPC_MUTEX_UNLOCK (cthread_mutex);
2120                        return;
2121                    }
2122
2123                    qe->pool = pool;
2124                    RPC_LIST_ADD_TAIL (pool->free_queue, qe, cthread_queue_elt_p_t);
2125                }
2126
2127                pool->max_queued = queue_size;
2128            }
2129        }
2130    }
2131
2132    RPC_MUTEX_UNLOCK (cthread_mutex);
2133}
2134
2135/*
2136**++
2137**
2138**  ROUTINE NAME:       rpc__cthread_start_all
2139**
2140**  SCOPE:              PRIVATE - declared in comcthd.h
2141**
2142**  DESCRIPTION:
2143**
2144**  Arrange for all the call execution threads to be created and
2145**  enabled RPC execution.
2146**
2147**  INPUTS:
2148**
2149**      default_cthreads The number of default pool call threads which will be
2150**                      created
2151**
2152**  INPUTS/OUTPUTS:     none
2153**
2154**  OUTPUTS:
2155**
2156**      status          A value indicating the status of the routine.
2157**
2158**          rpc_s_ok
2159**          rpc_s_no_memory
2160**
2161**  IMPLICIT INPUTS:    none
2162**
2163**  IMPLICIT OUTPUTS:   none
2164**
2165**  FUNCTION VALUE:     none
2166**
2167**  SIDE EFFECTS:       none
2168**
2169**--
2170**/
2171
2172PRIVATE void rpc__cthread_start_all
2173(
2174    unsigned32              default_pool_cthreads,
2175    unsigned32              *status
2176)
2177{
2178    cthread_pool_elt_p_t    p;
2179
2180    CODING_ERROR (status);
2181
2182    RPC_MUTEX_LOCK (cthread_mutex);
2183
2184    /*
2185     * Alloc the default pool if necessary (or just adjust its
2186     * thread count).
2187     */
2188    if (cthread_default_pool == NULL)
2189    {
2190        cthread_default_pool = cthread_pool_alloc (
2191                                    default_pool_cthreads,
2192                                    true, /* is_default_pool */
2193                                    status);
2194        if (*status != rpc_s_ok)
2195            goto CLEANUP;
2196    }
2197    else
2198    {
2199        cthread_pool_set_threadcnt(cthread_default_pool,
2200                    default_pool_cthreads, status);
2201        if (*status != rpc_s_ok)
2202            goto CLEANUP;
2203    }
2204
2205    /*
2206     * Fire up all of the call executor threads.
2207     */
2208    cthread_pool_start (cthread_default_pool, status);
2209    if (*status != rpc_s_ok)
2210        goto CLEANUP;
2211
2212    RPC_LIST_FIRST (cthread_reserved_pools, p, cthread_pool_elt_p_t);
2213    while (p != NULL)
2214    {
2215        cthread_pool_start (p, status);
2216        if (*status != rpc_s_ok)
2217            goto CLEANUP;
2218        RPC_LIST_NEXT (p, p, cthread_pool_elt_p_t);
2219    }
2220
2221    /*
2222     * enable RPC queuing / execution
2223     */
2224    cthread_invoke_enabled = true;
2225
2226    *status = rpc_s_ok;
2227
2228CLEANUP:
2229
2230    RPC_MUTEX_UNLOCK (cthread_mutex);
2231}
2232
2233/*
2234**++
2235**
2236**  ROUTINE NAME:       rpc__cthread_stop_all
2237**
2238**  SCOPE:              PRIVATE - declared in comcthd.h
2239**
2240**  DESCRIPTION:
2241**
2242**  Stop all the call executor threads.  Don't return until all have stopped.
2243**
2244**  INPUTS:             none
2245**
2246**  INPUTS/OUTPUTS:     none
2247**
2248**  OUTPUTS:
2249**
2250**      status          A value indicating the status of the routine.
2251**
2252**
2253**  IMPLICIT INPUTS:    none
2254**
2255**  IMPLICIT OUTPUTS:   none
2256**
2257**  FUNCTION VALUE:     none
2258**
2259**  SIDE EFFECTS:       none
2260**
2261**--
2262**/
2263
2264PRIVATE void rpc__cthread_stop_all
2265(
2266    unsigned32              *status
2267)
2268{
2269
2270    cthread_pool_elt_p_t    p;
2271
2272    CODING_ERROR (status);
2273
2274    RPC_MUTEX_LOCK (cthread_mutex);
2275
2276    /*
2277     * Disable subsequent call execution processing while we're
2278     * waiting for the executors to complete.
2279     */
2280
2281    cthread_invoke_enabled = false;
2282
2283    /*
2284     * Tell each pool to stop.
2285     */
2286    cthread_pool_stop(cthread_default_pool, false, status);
2287    if (*status != rpc_s_ok)
2288        goto CLEANUP;
2289
2290    RPC_LIST_FIRST (cthread_reserved_pools, p, cthread_pool_elt_p_t);
2291    while (p != NULL)
2292    {
2293        cthread_pool_stop(p, false, status);
2294        if (*status != rpc_s_ok)
2295            goto CLEANUP;
2296        RPC_LIST_NEXT (p, p, cthread_pool_elt_p_t);
2297    }
2298
2299    /*
2300     * Now wait for each pool's threads to complete.
2301     */
2302    cthread_pool_stop(cthread_default_pool, true, status);
2303    if (*status != rpc_s_ok)
2304        goto CLEANUP;
2305
2306    RPC_LIST_FIRST (cthread_reserved_pools, p, cthread_pool_elt_p_t);
2307    while (p != NULL)
2308    {
2309        cthread_pool_stop(p, true, status);
2310        if (*status != rpc_s_ok)
2311            goto CLEANUP;
2312        RPC_LIST_NEXT (p, p, cthread_pool_elt_p_t);
2313    }
2314
2315    *status = rpc_s_ok;
2316
2317CLEANUP:
2318
2319    RPC_MUTEX_UNLOCK (cthread_mutex);
2320}
2321
2322
2323/*
2324**++
2325**
2326**  ROUTINE NAME:       rpc__cthread_invoke_null
2327**
2328**  SCOPE:              PRIVATE - declared in comcthd.h
2329**
2330**  DESCRIPTION:
2331**
2332**  Arrange for a call execution thread to (eventually) be allocated to
2333**  "execute" the RPC.
2334**
2335**  INPUTS:             none
2336**
2337**  INPUTS/OUTPUTS:
2338**
2339**      call_rep        The call rep for the incoming call.
2340**      call_executor   The address of a routine to be called when the
2341**                      call thread actually wakes up
2342**      args            A pointer to be passed to the called routine
2343**
2344**  OUTPUTS:
2345**
2346**      status          A value indicating the status of the routine.
2347**
2348**          rpc_s_ok
2349**          rpc_s_cthread_not_found
2350**          rpc_s_call_queued
2351**
2352**  IMPLICIT INPUTS:    none
2353**
2354**  IMPLICIT OUTPUTS:   none
2355**
2356**  FUNCTION VALUE:     void
2357**
2358**  SIDE EFFECTS:
2359**                      call may be queued if no available call executors
2360**
2361**--
2362**/
2363
2364PRIVATE void rpc__cthread_invoke_null
2365(
2366    rpc_call_rep_p_t        call_rep,
2367    uuid_p_t                object,
2368    uuid_p_t                if_uuid,
2369    unsigned32              if_ver,
2370    unsigned32              if_opnum,
2371    rpc_prot_cthread_executor_fn_t cthread_executor,
2372    dce_pointer_t               args,
2373    unsigned32              *status
2374)
2375{
2376    rpc_cthread_pvt_info_p_t    pvt = &call_rep->u.server.cthread;
2377    unsigned32                  lookup_fn_st;
2378    cthread_pool_elt_p_t        p;
2379    cthread_elt_p_t             cthread;
2380
2381    CODING_ERROR (status);
2382
2383    RPC_MUTEX_LOCK (cthread_mutex);
2384
2385    /*
2386     * Check to ensure that it's still desireable to queue/execute a call.
2387     *
2388     * While strictly speaking we need to examine cthread_invoke_enabled
2389     * under a mutex, we really don't want to pay the cost in this critical
2390     * path and I think things will work reasonably safely get by without it.
2391     * The worst that will happen is that a (couple) extra call(s) will be
2392     * allowed to be queued / executed during shutdown processing.
2393     */
2394    if (cthread_invoke_enabled == false)
2395    {
2396        *status = rpc_s_cthread_invoke_disabled;
2397        goto CLEANUP;
2398    }
2399
2400    /*
2401     * Setup fields in the call rep for subsequent execution.
2402     */
2403    pvt->executor = cthread_executor;
2404    pvt->optargs = args;
2405
2406    /*
2407     * Attempt to locate / assign an idle thread (this code is
2408     * in-line because this is the fast-path).
2409     */
2410    CTHREAD_POOL_LOOKUP_RESERVED(object, if_uuid, if_ver, if_opnum,
2411        &p, &lookup_fn_st);
2412    if (lookup_fn_st != 0)
2413    {
2414        *status = rpc_s_cthread_not_found;
2415        goto CLEANUP;
2416    }
2417
2418    if (p == NULL)
2419    {
2420        /*
2421         * Only concerned with default pool.
2422         */
2423        p = cthread_default_pool;
2424
2425        CTHREAD_POOL_ASSIGN_THREAD(cthread_default_pool, &cthread);
2426    }
2427    else
2428    {
2429        /*
2430         * First assign an idle reserved pool thread; otherwise,
2431         * assign an idle default pool thread.
2432         */
2433        CTHREAD_POOL_ASSIGN_THREAD(p, &cthread);
2434        if (cthread == NULL)
2435        {
2436            CTHREAD_POOL_ASSIGN_THREAD(cthread_default_pool, &cthread);
2437        }
2438    }
2439
2440    /*
2441     * If we've succeeded in assigning a cthread, arrange for it to
2442     * actually execute the RPC.  Otherwise, attempt to queue the RPC
2443     * for deferred execution.
2444     */
2445    if (cthread != NULL)
2446    {
2447        /*
2448         * Setup fields in the call rep for subsequent execution.
2449         */
2450        pvt->is_queued = false;
2451        pvt->thread_h = cthread->thread_id;
2452        cthread->call_rep = call_rep;
2453
2454        /*
2455         * Fire up the assigned cthread.
2456         */
2457        RPC_COND_SIGNAL(cthread->thread_cond, cthread_mutex);
2458
2459        *status = rpc_s_ok;
2460    }
2461    else
2462    {
2463        cthread_pool_queue_call(p, call_rep, status);
2464        if (*status == rpc_s_ok)
2465            *status = rpc_s_call_queued;
2466    }
2467
2468CLEANUP:
2469
2470    RPC_MUTEX_UNLOCK (cthread_mutex);
2471}
2472
2473
2474/*
2475**++
2476**
2477**  ROUTINE NAME:       rpc__cthread_dequeue
2478**
2479**  SCOPE:              PRIVATE - included in comcthd.h
2480**
2481**  DESCRIPTION:        Remove a call rep from the call executor
2482**                      thread waiting queue, if it's there.
2483**
2484**  INPUTS:             none
2485**
2486**  INPUTS/OUTPUTS:
2487**
2488**      call_rep        The call rep to be dequeued.
2489**
2490**  OUTPUTS:            none
2491**
2492**  IMPLICIT INPUTS:    none
2493**
2494**  IMPLICIT OUTPUTS:   none
2495**
2496**  FUNCTION VALUE:     boolean
2497**
2498**                      T => call was previously queued.
2499**
2500**  SIDE EFFECTS:       none
2501**
2502**--
2503**/
2504
2505PRIVATE boolean32 rpc__cthread_dequeue
2506(
2507  rpc_call_rep_p_t        call_rep
2508)
2509{
2510    boolean32                   was_dequeued;
2511
2512    RPC_MUTEX_LOCK (cthread_mutex);
2513
2514    was_dequeued = cthread_call_dequeue (call_rep);
2515
2516    RPC_MUTEX_UNLOCK (cthread_mutex);
2517
2518    return was_dequeued;
2519}
2520
2521
2522/*
2523**++
2524**
2525**  ROUTINE NAME:       rpc__cthread_cancel
2526**
2527**  SCOPE:              PRIVATE - included in comcthd.h
2528**
2529**  DESCRIPTION:        Post a cancel to cthread associated with a call
2530**
2531**  INPUTS:             none
2532**
2533**  INPUTS/OUTPUTS:
2534**
2535**      call            The call that the cancel is associated with.
2536**
2537**  OUTPUTS:            none
2538**
2539**  IMPLICIT INPUTS:    none
2540**
2541**  IMPLICIT OUTPUTS:   none
2542**
2543**  FUNCTION VALUE:     void
2544**
2545**  SIDE EFFECTS:
2546**                      a cancel may be posted to the call execution thread
2547**
2548**--
2549**/
2550
2551PRIVATE void rpc__cthread_cancel
2552(
2553  rpc_call_rep_p_t        call
2554)
2555{
2556    RPC_CALL_LOCK_ASSERT(call);
2557
2558    if (!call->u.server.cancel.accepting)
2559        return;
2560
2561    call->u.server.cancel.count++;
2562
2563    if (!call->u.server.cancel.queuing)
2564    {
2565        rpc_cthread_pvt_info_p_t    pvt = &call->u.server.cthread;
2566
2567        RPC_MUTEX_LOCK (cthread_mutex);
2568
2569        dcethread_interrupt_throw(pvt->thread_h);
2570
2571        RPC_MUTEX_UNLOCK (cthread_mutex);
2572    }
2573}
2574
2575/*
2576**++
2577**
2578**  ROUTINE NAME:       rpc__cthread_cancel_caf
2579**
2580**  SCOPE:              PRIVATE - included in comcthd.h
2581**
2582**  DESCRIPTION:        Check for pending cancel and flush.
2583**
2584**  INPUTS:             none
2585**
2586**  INPUTS/OUTPUTS:
2587**
2588**      call            The call that the cancel is associated with.
2589**
2590**  OUTPUTS:            none
2591**
2592**  IMPLICIT INPUTS:    none
2593**
2594**  IMPLICIT OUTPUTS:   none
2595**
2596**  FUNCTION VALUE:
2597**                      boolean32 => T iff call had a pending cancel
2598**
2599**  SIDE EFFECTS:
2600**                      the call will no longer accept cancels
2601**                      any pending cancels will be flushed (i.e. the
2602**                      call thread must not have any residual pending
2603**                      cancels upon completion)
2604**
2605**--
2606**/
2607
2608PRIVATE boolean32 rpc__cthread_cancel_caf
2609(
2610  rpc_call_rep_p_t        call
2611)
2612{
2613    int oc;
2614
2615    RPC_CALL_LOCK_ASSERT(call);
2616
2617    /*
2618     * In the event this is called multiple times, return something
2619     * sensible (i.e. return the current "had pending" state).
2620     */
2621    if (!call->u.server.cancel.accepting)
2622    {
2623        return (call->u.server.cancel.had_pending);
2624    }
2625
2626    /*
2627     * Cancels are no longer accepted by this call.
2628     */
2629    call->u.server.cancel.accepting = false;
2630
2631    /*
2632     * Determine if the call has a cancel pending (flush any accepted
2633     * cancels).  Only want to take the expensive path if a cancel request
2634     * had been previously accepted.
2635     */
2636    call->u.server.cancel.had_pending = false;
2637    if (call->u.server.cancel.count)
2638    {
2639#ifndef _PTHREAD_NO_CANCEL_SUPPORT
2640        oc = dcethread_enableinterrupt_throw(1);
2641        DCETHREAD_TRY
2642        {
2643            dcethread_checkinterrupt();
2644        }
2645        DCETHREAD_CATCH(dcethread_interrupt_e)
2646        {
2647            call->u.server.cancel.had_pending = true;
2648        }
2649        DCETHREAD_ENDTRY
2650        dcethread_enableinterrupt_throw(oc);
2651#else
2652        /*
2653         * Cancels not supported, so the previously accepted forwarded
2654         * cancels are still pending.
2655         */
2656        call->u.server.cancel.had_pending = true;
2657#endif
2658    }
2659    call->u.server.cancel.count = 0;
2660
2661    /*
2662     * Let the caller know if a cancel was pending (without them having
2663     * to look at the flag).
2664     */
2665    return (call->u.server.cancel.had_pending);
2666}
2667
2668/*
2669**++
2670**
2671**  ROUTINE NAME:       rpc__cthread_cancel_enable_post
2672**
2673**  SCOPE:              PRIVATE - included in comcthd.h
2674**
2675**  DESCRIPTION:        Enable direct posting of cancels to a cthread;
2676**                      post any previously queued cancels.
2677**
2678**  INPUTS:             none
2679**
2680**  INPUTS/OUTPUTS:
2681**
2682**      call            The call that the cancel is associated with.
2683**
2684**  OUTPUTS:            none
2685**
2686**  IMPLICIT INPUTS:    none
2687**
2688**  IMPLICIT OUTPUTS:   none
2689**
2690**  FUNCTION VALUE:     void
2691**
2692**  SIDE EFFECTS:
2693**                      a cancel may be posted to the call execution thread
2694**
2695**--
2696**/
2697
2698PRIVATE void rpc__cthread_cancel_enable_post
2699(
2700  rpc_call_rep_p_t        call
2701)
2702{
2703    rpc_cthread_pvt_info_p_t    pvt = &call->u.server.cthread;
2704    unsigned16 cancel_cnt;
2705
2706    RPC_CALL_LOCK_ASSERT(call);
2707
2708    RPC_MUTEX_LOCK (cthread_mutex);
2709
2710    if (call->u.server.cancel.accepting && call->u.server.cancel.queuing)
2711    {
2712        call->u.server.cancel.queuing = false;
2713        for (cancel_cnt = call->u.server.cancel.count; cancel_cnt--; )
2714        {
2715            dcethread_interrupt_throw(pvt->thread_h);
2716        }
2717    }
2718
2719    RPC_MUTEX_UNLOCK (cthread_mutex);
2720}
2721