/* * Copyright (c) 2010 Apple Inc. All rights reserved. * * @APPLE_LICENSE_HEADER_START@ * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither the name of Apple Inc. ("Apple") nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY APPLE AND ITS CONTRIBUTORS "AS IS" AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL APPLE OR ITS CONTRIBUTORS BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * Portions of this software have been released under the following terms: * * (c) Copyright 1989-1993 OPEN SOFTWARE FOUNDATION, INC. * (c) Copyright 1989-1993 HEWLETT-PACKARD COMPANY * (c) Copyright 1989-1993 DIGITAL EQUIPMENT CORPORATION * * To anyone who acknowledges that this file is provided "AS IS" * without any express or implied warranty: * permission to use, copy, modify, and distribute this file for any * purpose is hereby granted without fee, provided that the above * copyright notices and this notice appears in all source code copies, * and that none of the names of Open Software Foundation, Inc., Hewlett- * Packard Company or Digital Equipment Corporation be used * in advertising or publicity pertaining to distribution of the software * without specific, written prior permission. Neither Open Software * Foundation, Inc., Hewlett-Packard Company nor Digital * Equipment Corporation makes any representations about the suitability * of this software for any purpose. * * Copyright (c) 2007, Novell, Inc. All rights reserved. * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither the name of Novell Inc. nor the names of its contributors * may be used to endorse or promote products derived from this * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * @APPLE_LICENSE_HEADER_END@ */ /* ** ** NAME ** ** comcthd.c ** ** FACILITY: ** ** Remote Procedure Call (RPC) ** ** ABSTRACT: ** ** Definition of the Call Thread Services for the Common ** Communication Services component. These routines permit ** a call thread to be created and invoked. ** ** */ #include /* Common declarations for all RPC runtime */ #include /* Common communications services */ #include /* Common protocol services */ #include /* Common network address family services */ #include /* Private communications services */ #include /* Shared call thread services */ /* * The multiplier to apply to a pool's n_threads to get the queue depth. */ #ifndef RPC_C_CTHREAD_QUEUE_MULTIPLIER # define RPC_C_CTHREAD_QUEUE_MULTIPLIER 8 #endif /* * Call thread states */ #define RPC_C_NO_CTHREAD 0 /* no thread exists */ #define RPC_C_IDLE_CTHREAD 1 /* thread exists but is idle */ #define RPC_C_ACTIVE_CTHREAD 2 /* thread has call allocated to it */ /* * The call thread table element structure. * See the "Call Thread Pool" description for more info... * * When a thread is idle, it is waiting on its private condition variable. * * The per element "pool" pointer simply solves the problem of needing * the created thread to have two args (without having to alloc memory * just for the create phase). The space really isn't "wasted" since * since each thread would otherwise have to keep this var on their stack. */ typedef struct { unsigned8 thread_state; dcethread* thread_id; rpc_cond_t thread_cond; struct cthread_pool_elt_t *pool; rpc_call_rep_p_t call_rep; } cthread_elt_t, *cthread_elt_p_t; /* * Reserved Pool Call Queue Element. * * These queue elements form a backing structure that allows * us to maintain a call_rep on two pool lists. */ typedef struct { rpc_list_t link; struct cthread_pool_elt_t *pool; rpc_call_rep_p_t call_rep; } cthread_queue_elt_t, *cthread_queue_elt_p_t; /* * Call Thread Pools. * * READ THIS IF YOU WANT TO UNDERSTAND THIS STUFF! * * This structure exists to accomplish the desired effect of allowing * applications to have calls execute using threads from some application * defined set of thread pools. The application declared thread pools are * refered to "reserved" pools (due to a previous incarnation of this * code which provided -a too limited scheme of- thread reservation on * per-interface basis). * * The application (via the pool lookup function callout) gets to decide * which pool a call should be associated with. This general mechanism * provides the application with the basic hooks to implement any one * of a number of schemes for allocating calls to threads. * * If the application declares that the call should use a reserved pool, * a free thread from that pool will be allocated, otherwise, a default * pool free thread will be allocated, otherwise, the call will be put * on the default and reserved pool queue for execution by the first * available default or reserved pool thread. In any case, the calls * for a reserved pool are assigned idle execution threads in the order * in which they are received. * * The default pool is created by rpc_server_listen(). Non-reserved * (default pool) threads can execute *any* call (the idle threads are * assigned to calls in the order in which they are received). The total * number of call threads in a server is the sum of the threads in the * default pool and the reserved pools. * * The relationship between n_queued, max_queued and the call_queue * requires some explanation. n_queued and max_queued represent the * number and limit respectively of the number of call_queue entries * FOR THIS POOL TYPE. For *reserved pools*, all of these variables * make sense in an intuitive way: n_queued always represents the true * number of elements on the queue and the number of elements on the * queue will not exceed max_queued. * * The default pool's use of these variables is less intuitive since * *all* queued calls are on the default pool's queue. In this case, * the number of elements on the queue can *exceed* the default pool's * n_queued and max_queued! n_queued and max_queued (as stated above) * strictly represent the number of default pool calls (i.e. those calls * that are not associated with a reserved pool). The end result is * that this accomplishes the desired max queuing limitations - i.e. * the maximums are imposed on a per pool basis; use the queue to * to determine of there are actually any queued calls to process, * NOT n_queued. * * The default pool uses its pool.call_queue to directly link call reps * and it does not use the pool.free_queue. The reserved pools can't * directly link the call reps (because call reps only have one rpc_list * "thread", because the rpc_list macros only work for a single * "thread"...). Therefore, the reserved pools each maintain their * own (static) set of cthread_call_queue_elt_t elements. Reserved pools' * call_queue consists of a set of call_queue_elts that point to the * associated call_reps (which are on the default pool queue). The * call_queue_elts are maintained on the pool's pool.free_queue when * not in use on the pool.call_queue. * * Startup / Shutdown processing... We have a requirement that * rpc_server_listen() can be called, return and then be called again * (a startup, shutdown, restart sequence). All call threads should * be terminated by a shutdown (to free up resources). All threads * (including those requested by previous calls to * rpc_server_create_thread_pool()) must be automatically recreated upon * a restart. * * Pool creation is two step process. First, a pool descriptor is * allocated (cthread_pool_alloc()). Subsequently, a pool may be "started" * (cthread_pool_start()); this actually creates the call threads. * * Shutting down involves stopping all of the call threads in all of * the pools (including freeing each pool's call thread table). The * pool descriptors are not freed. This is necessary to retain information * that is needed to restart the server. */ typedef struct cthread_pool_elt_t { rpc_list_t link; /* linked list of pools */ unsigned16 n_threads; /* total number of threads in the pool */ unsigned16 n_idle; /* number of idle (available) threads */ cthread_elt_p_t ctbl; /* the cthreads associated with the pool */ cthread_elt_p_t idle_cthread; /* pointer to a known idle cthread */ unsigned32 n_queued; /* see above! */ unsigned32 max_queued; /* see above! */ rpc_list_t call_queue; /* see above!; list of calls queued */ rpc_list_t free_queue; /* see above!; list of free call_queue elements */ unsigned stop : 1; /* T => pool's threads stop when complete */ unsigned queue_elt_alloc : 1; /* T => start() should allocate queue elts */ } cthread_pool_elt_t, *cthread_pool_elt_p_t; /* * Pools are only associated with the MAJOR version of an interface. */ #define IF_VERS_MAJOR(_vers) ((_vers) & 0xffff) #define IF_VERS_MINOR(_vers) (((_vers) >> 16) & 0xffff) /* * A couple of macros for convienience. */ #define CTHREAD_POOL_IS_QUEUE_FULL(p) ((p)->n_queued >= (p)->max_queued) #define CTHREAD_POOL_IS_QUEUE_EMPTY(p) (RPC_LIST_EMPTY ((p)->call_queue)) /* * A couple of macros for (fast path) performance. */ #define CTHREAD_POOL_LOOKUP_RESERVED(object, if_uuid, if_ver, if_opnum, p, st) \ { \ RPC_MUTEX_LOCK_ASSERT (cthread_mutex); \ if (cthread_pool_lookup_fn == NULL) \ { \ *(p) = NULL; \ *st = 0; \ } \ else \ { \ rpc_if_id_t if_id; \ if_id.uuid = *(if_uuid); \ if_id.vers_major = IF_VERS_MAJOR(if_ver); \ if_id.vers_minor = IF_VERS_MINOR(if_ver); \ (*cthread_pool_lookup_fn) (\ object, &if_id, if_opnum, \ (rpc_thread_pool_handle_t *)p, st); \ } \ } #define CTHREAD_POOL_ASSIGN_THREAD(p, ct) \ { \ RPC_MUTEX_LOCK_ASSERT (cthread_mutex); \ if ((p)->idle_cthread != NULL) \ { \ *(ct) = (p)->idle_cthread; \ (p)->idle_cthread = NULL; \ assert((*(ct))->thread_state == RPC_C_IDLE_CTHREAD); \ (*(ct))->thread_state = RPC_C_ACTIVE_CTHREAD; \ (p)->n_idle--; \ } \ else \ { \ *(ct) = cthread_pool_assign_thread(p); \ } \ } #define CTHREAD_POOL_IDLE_THREAD(p, ct) \ { \ (p)->n_idle++; \ (p)->idle_cthread = ct; \ } /* * The reserved pools. * * The pools are linked together via their pool.pool_list field. */ INTERNAL rpc_list_t cthread_reserved_pools; /* * A handle to the special default pool. */ INTERNAL cthread_pool_elt_p_t cthread_default_pool; /* * The maximum number of calls that will be queued for the default * thread pool. This value is settable via the rpc_server_set_thread_pool_qlen * function. If not set, a default value of 8 times the number of * default pool threads is used. */ INTERNAL unsigned32 cthread_default_call_queue_size; /* * The "reaper's" pool queue and timer. * * The pools are linked together via their pool.pool_list field. */ INTERNAL rpc_list_t cthread_reaper_queue; INTERNAL rpc_timer_t cthread_reaper_timer; #ifndef RPC_C_CTHREAD_REAPER_FREQ # define RPC_C_CTHREAD_REAPER_FREQ RPC_CLOCK_SEC(3*60) #endif /* * cthread_mutex protects all of the cthread private structures. */ INTERNAL rpc_mutex_t cthread_mutex; /* * A global that controls the overall ability of RPCs to be assigned * to a pool/thread for execution (i.e. it controls rpc__cthread_invoke_null). */ INTERNAL boolean cthread_invoke_enabled; /* * A global that points to the application specified thread pool lookup function. */ INTERNAL rpc_thread_pool_fn_t cthread_pool_lookup_fn; INTERNAL void cthread_create ( cthread_elt_p_t /*cthread*/, unsigned32 * /*status*/ ); INTERNAL void cthread_call_executor ( cthread_elt_p_t /*cthread*/ ); INTERNAL void cthread_reaper ( dce_pointer_t /*arg*/ ); INTERNAL cthread_pool_elt_p_t cthread_pool_alloc ( unsigned32 /*n_threads*/, boolean32 /*is_default_pool*/, unsigned32 * /*status*/ ); INTERNAL void cthread_pool_set_threadcnt ( cthread_pool_elt_p_t /*p*/, unsigned32 /*n_threads*/, unsigned32 * /*status*/ ); INTERNAL void cthread_pool_free ( cthread_pool_elt_p_t /*p*/, unsigned32 * /*status*/ ); INTERNAL void cthread_pool_start ( cthread_pool_elt_p_t /*p*/, unsigned32 * /*status*/ ); INTERNAL void cthread_pool_stop ( cthread_pool_elt_p_t /*p*/, unsigned32 /*wait_flag*/, unsigned32 * /*status*/ ); INTERNAL cthread_elt_p_t cthread_pool_assign_thread ( cthread_pool_elt_p_t /*p*/ ); INTERNAL void cthread_pool_queue_call ( cthread_pool_elt_p_t /*p*/, rpc_call_rep_p_t /*call_rep*/, unsigned32 * /*status*/ ); INTERNAL rpc_call_rep_p_t cthread_pool_dequeue_first ( cthread_pool_elt_p_t /*p*/ ); INTERNAL boolean32 cthread_call_dequeue ( rpc_call_rep_p_t /*call_rep*/ ); /* **++ ** ** ROUTINE NAME: cthread_create ** ** SCOPE: INTERNAL ** ** DESCRIPTION: ** ** Create a call thread and initialize the table entry. ** ** INPUTS: ** ** cthread The cthread_table entry to use. ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: ** ** status A value indicating the status of the routine. ** ** rpc_s_ok ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: none ** ** SIDE EFFECTS: none ** **-- **/ INTERNAL void cthread_create ( cthread_elt_p_t volatile cthread, unsigned32 *status ) { dcethread* handle_copy; RPC_MUTEX_LOCK_ASSERT (cthread_mutex); CODING_ERROR (status); /* * Create a thread for this entry, passing it a pointer its * call thread table entry. Detach the thread since no one * ever joins with the thread (and we don't want it to become * forever zombie'd when it terminates). */ DCETHREAD_TRY { dcethread_create_throw (&cthread->thread_id, &rpc_g_server_dcethread_attr, (dcethread_startroutine)cthread_call_executor, (dcethread_addr)cthread); cthread->thread_state = RPC_C_IDLE_CTHREAD; handle_copy = cthread->thread_id; dcethread_detach_throw(handle_copy); *status = rpc_s_ok; } DCETHREAD_CATCH_ALL(THIS_CATCH) { *status = rpc_s_cthread_create_failed; /* FIXME MNE */ fprintf(stderr, "XXX MIREK: %s: %s: %d: cthread creation failure\n", __FILE__, __PRETTY_FUNCTION__, __LINE__); } DCETHREAD_ENDTRY return; } /* **++ ** ** ROUTINE NAME: cthread_call_executor ** ** SCOPE: INTERNAL ** ** DESCRIPTION: ** ** The base routine of all call executor threads. Loop awaiting ** and processing calls until told to stop. ** ** INPUTS: ** ** cthread Pointer to the thread's call table element ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: none ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: none ** ** SIDE EFFECTS: none ** **-- **/ INTERNAL void cthread_call_executor ( cthread_elt_p_t cthread ) { rpc_call_rep_t *call_rep = NULL; rpc_cthread_pvt_info_p_t pvt = NULL; cthread_pool_elt_p_t p = cthread->pool; boolean skip_startup = true; /* * Call executors execute with general cancelability disabled * until the stub dispatched to the manager. This prevents the * call executor from having a pending cancel delivered to it before * the manager is called. */ dcethread_enableinterrupt_throw(0); RPC_MUTEX_LOCK (cthread_mutex); if (CTHREAD_POOL_IS_QUEUE_EMPTY(p)) { skip_startup = false; } /* * Loop executing calls until we're told to exit. */ while (true) { boolean run_queued_call = false; if (!skip_startup) { /* * Update the pool's idle thread info. */ CTHREAD_POOL_IDLE_THREAD(p, cthread); /* * Wait for a call assignment (or until we're told to exit). */ while (cthread->thread_state == RPC_C_IDLE_CTHREAD && ! p->stop) { RPC_COND_WAIT (cthread->thread_cond, cthread_mutex); } /* * If we've been told to stop, then do so. */ if (p->stop) { break; } /* * Setup the call that was assigned to us. */ call_rep = cthread->call_rep; assert(call_rep != NULL); pvt = &call_rep->u.server.cthread; } /* * Execute the call assigned to us, followed by any queued calls. */ do { if (!skip_startup) { RPC_DBG_PRINTF (rpc_e_dbg_general, 15, ("(cthread_call_executor) pool %p cthread %p executing call_rep %p\n", p, cthread, call_rep)); /* * Unlock the cthread_mutex while the call is executing. */ RPC_MUTEX_UNLOCK (cthread_mutex); /* * Invoke the routine provided when this thread was invoked * with the argument provided. The routine is always called * with general cancelability disabled AND WITH THE CALL LOCKED. * Since we don't have reference counts (in the common code) * this call reference and lock is 'handed off' to the routine * (which is responsible for releasing the lock). Upon completion * of the 'routine' we can no longer reference the call (it may * no longer exist). */ RPC_CALL_LOCK(cthread->call_rep); (*(pvt->executor)) (pvt->optargs, run_queued_call); /* * Reacquire the cthread mutex and check for queued calls. * As the above somment sez; we no longer hold the call lock * at this point. */ RPC_MUTEX_LOCK (cthread_mutex); } /* * Select the oldest queued call; remove it from its queue(s) * and setup to execute it. */ skip_startup = false; if (CTHREAD_POOL_IS_QUEUE_EMPTY(p)) { run_queued_call = false; continue; } call_rep = cthread_pool_dequeue_first(p); pvt = &call_rep->u.server.cthread; /* * Fill in the thread_h of the protocol specific call * handle for use by the protocol module. */ pvt->thread_h = cthread->thread_id; /* * Update the cthread table entry for this call just to be * consistent. */ cthread->call_rep = call_rep; /* * Indicate there's a queued call to process. */ run_queued_call = true; } while (run_queued_call); /* * Free up this thread to be allocated again. */ cthread->thread_state = RPC_C_IDLE_CTHREAD; } RPC_DBG_PRINTF (rpc_e_dbg_general, 5, ("(cthread_call_executor) pool %p cthread %p stopped\n", p, cthread)); /* * Notify others that the cthread is exiting. */ cthread->thread_state = RPC_C_NO_CTHREAD; RPC_COND_BROADCAST (cthread->thread_cond, cthread_mutex); RPC_MUTEX_UNLOCK (cthread_mutex); } /* **++ ** ** ROUTINE NAME: cthread_reaper ** ** SCOPE: INTERNAL ** ** DESCRIPTION: ** ** Free pools as they become idle ** (this is run periodically from the timer thread). ** ** INPUTS: none ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: none ** ** IMPLICIT INPUTS: ** cthread_reaper_queue the queue of waiting to be freed pools ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: none ** ** SIDE EFFECTS: none ** **-- **/ INTERNAL void cthread_reaper ( dce_pointer_t unused_arg ATTRIBUTE_UNUSED ) { cthread_pool_elt_p_t p, np; unsigned32 i; unsigned32 st; cthread_elt_p_t cthread; boolean free_pool; RPC_MUTEX_LOCK (cthread_mutex); /* * Scan the queue looking for (and freeing) idle pools. */ RPC_LIST_FIRST(cthread_reaper_queue, p, cthread_pool_elt_p_t); while (p != NULL) { free_pool = true; if (p->ctbl != NULL) { /* * See if all of the pool's threads have completed. */ for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++) { if (cthread->thread_state != RPC_C_NO_CTHREAD) { free_pool = false; break; } } } if (! free_pool) { RPC_LIST_NEXT (p, p, cthread_pool_elt_p_t); continue; } RPC_DBG_PRINTF (rpc_e_dbg_general, 5, ("(cthread_reaper) freeing pool %p\n", p)); /* * Remove the pool from the reaper's queue (pool free really * frees the storage)... but first, determine the "next pool" * so we can continue the scan. */ RPC_LIST_NEXT (p, np, cthread_pool_elt_p_t); RPC_LIST_REMOVE (cthread_reaper_queue, p); /* * Free up the pool's descriptor. */ cthread_pool_free(p, &st); /* * Continue scanning with the next on the list. */ p = np; } /* * Shutdown the reaper timer when there's nothing to reap. */ if (RPC_LIST_EMPTY(cthread_reaper_queue)) rpc__timer_clear(&cthread_reaper_timer); RPC_MUTEX_UNLOCK (cthread_mutex); } /* **++ ** ** ROUTINE NAME: cthread_pool_alloc ** ** SCOPE: INTERNAL ** ** DESCRIPTION: ** ** Allocate the resources for a pool (cthread_pool_start() actually creates the ** threads). ** ** INPUTS: ** ** n_threads number of call threads in the pool ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: ** ** status A value indicating the status of the routine. ** ** rpc_s_ok ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: ** ** p the created pool ** ** SIDE EFFECTS: none ** **-- **/ INTERNAL cthread_pool_elt_p_t cthread_pool_alloc ( unsigned32 n_threads, boolean32 is_default_pool, unsigned32 *status ) { cthread_pool_elt_p_t p = NULL; CODING_ERROR (status); RPC_MUTEX_LOCK_ASSERT (cthread_mutex); /* * Check for the sanity of the number of threads. */ if (n_threads == 0) { *status = rpc_s_max_calls_too_small; return p; } /* * Alloc a pool descriptor. */ RPC_MEM_ALLOC (p, cthread_pool_elt_p_t, sizeof (cthread_pool_elt_t), RPC_C_MEM_CTHREAD_POOL, RPC_C_MEM_WAITOK); if (p == NULL) { *status = rpc_s_no_memory; return p; } /* * Init the fields in the pool descriptor. */ RPC_LIST_INIT (p->link); p->n_threads = n_threads; p->n_idle = 0; p->ctbl = NULL; p->idle_cthread = NULL; p->n_queued = 0; /* * If the application has indicated a preference for the call queue depth * of the default pool, use that. Otherwise, default to 8 times the number * of threads in the pool. */ if (is_default_pool && cthread_default_call_queue_size != 0) p->max_queued = cthread_default_call_queue_size; else p->max_queued = RPC_C_CTHREAD_QUEUE_MULTIPLIER * n_threads; RPC_LIST_INIT (p->call_queue); RPC_LIST_INIT (p->free_queue); p->stop = false; p->queue_elt_alloc = ! is_default_pool; *status = rpc_s_ok; /* CLEANUP: */ if (*status != rpc_s_ok) { if (p != NULL) RPC_MEM_FREE (p, RPC_C_MEM_CTHREAD_POOL); p = NULL; } return p; } /* **++ ** ** ROUTINE NAME: cthread_pool_set_threadcnt ** ** SCOPE: INTERNAL ** ** DESCRIPTION: ** ** Modify the number of threads associated with the pool ** This is not intended to generically work; this is only ** suppose to work on "idle" pools (alloc'ed but not started, ** or started and then stopped). ** ** INPUTS: ** ** p the pool who's count to modify ** n_threads the new number of threads ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: ** ** status A value indicating the status of the routine. ** ** rpc_s_ok ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: none ** ** SIDE EFFECTS: none ** **-- **/ INTERNAL void cthread_pool_set_threadcnt ( cthread_pool_elt_p_t p, unsigned32 n_threads, unsigned32 *status ) { CODING_ERROR (status); RPC_MUTEX_LOCK_ASSERT (cthread_mutex); /* * Check for the sanity of the number of threads. */ if (n_threads == 0) { *status = rpc_s_max_calls_too_small; return; } p->n_threads = n_threads; /* * Use a default call queue size if we're operating on a private pool, * or if this is the default pool and the application hasn't previously * specified a default call queue size for the default pool. */ if (p != cthread_default_pool || cthread_default_call_queue_size == 0) p->max_queued = RPC_C_CTHREAD_QUEUE_MULTIPLIER * n_threads; *status = rpc_s_ok; } /* **++ ** ** ROUTINE NAME: cthread_pool_free ** ** SCOPE: INTERNAL ** ** DESCRIPTION: ** ** Free the (assumed idle) pool's resources. ** ** INPUTS: ** ** p the pool to free ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: ** ** status A value indicating the status of the routine. ** ** rpc_s_ok ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: none ** ** SIDE EFFECTS: none ** **-- **/ INTERNAL void cthread_pool_free ( cthread_pool_elt_p_t p, unsigned32 *status ) { unsigned32 i; cthread_elt_p_t cthread; CODING_ERROR (status); RPC_MUTEX_LOCK_ASSERT (cthread_mutex); /* * The assumption is that the pool is idle (all of its threads * have terminated). */ /* * Clean up and free the ctbl. If there is a ctbl, the assumption * is that all of the ctable's entries have valid initialized cv's. */ if (p->ctbl) { for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++) { RPC_COND_DELETE (cthread->thread_cond, cthread_mutex); } RPC_MEM_FREE (p->ctbl, RPC_C_MEM_CTHREAD_CTBL); p->ctbl = NULL; } /* * Free up the queue elt table. */ while (! RPC_LIST_EMPTY(p->free_queue)) { cthread_queue_elt_p_t qe; RPC_LIST_REMOVE_TAIL(p->free_queue, qe, cthread_queue_elt_p_t); RPC_MEM_FREE (qe, RPC_C_MEM_CTHREAD_QETBL); } /* * Free the pool descriptor. */ RPC_MEM_FREE (p, RPC_C_MEM_CTHREAD_POOL); *status = rpc_s_ok; } /* **++ ** ** ROUTINE NAME: cthread_pool_start ** ** SCOPE: INTERNAL ** ** DESCRIPTION: ** ** Start up the call execution threads for an existing pool. ** ** INPUTS: ** ** p the pool to start ** n_threads number of call threads in the pool ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: ** ** status A value indicating the status of the routine. ** ** rpc_s_ok ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: none ** ** SIDE EFFECTS: none ** **-- **/ INTERNAL void cthread_pool_start ( cthread_pool_elt_p_t p, unsigned32 *status ) { unsigned32 i; cthread_elt_p_t cthread; CODING_ERROR (status); RPC_MUTEX_LOCK_ASSERT (cthread_mutex); /* * The pool should not currently have any actual call threads. */ if (p->ctbl != NULL) { RPC_DBG_GPRINTF ( ("(cthread_pool_start) pool %p orphaning ctbl\n", p)); } /* * Allocate the pool's call thread table. */ RPC_MEM_ALLOC (p->ctbl, cthread_elt_p_t, p->n_threads * (sizeof (cthread_elt_t)), RPC_C_MEM_CTHREAD_CTBL, RPC_C_MEM_WAITOK); if (p->ctbl == NULL) { *status = rpc_s_no_memory; return; } /* * Init the pool's cthread table / create the cthreads. * Do this in two phases to ensure that the table is * sane in the event that thread creation fails and cleanup * is necessary. */ for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++) { cthread->pool = p; cthread->thread_state = RPC_C_NO_CTHREAD; RPC_COND_INIT (cthread->thread_cond, cthread_mutex); } for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++) { cthread_create(cthread, status); if (*status != rpc_s_ok) { RPC_DBG_GPRINTF ( ("(cthread_pool_start) pool %p couldn't create thread %d\n", p, i)); goto CLEANUP; } } /* * Setup additional fields in the pool descriptor. */ p->n_idle = 0; p->idle_cthread = NULL; p->n_queued = 0; RPC_LIST_INIT (p->call_queue); RPC_LIST_INIT (p->free_queue); /* * Allocate the pool's queue elements if necessary. */ if (p->queue_elt_alloc) { for (i = 0; i < p->max_queued; i++) { cthread_queue_elt_p_t qe; RPC_MEM_ALLOC (qe, cthread_queue_elt_p_t, sizeof (cthread_queue_elt_t), RPC_C_MEM_CTHREAD_QETBL, RPC_C_MEM_WAITOK); if (qe == NULL) { *status = rpc_s_no_memory; goto CLEANUP; } qe->pool = p; RPC_LIST_ADD_TAIL (p->free_queue, qe, cthread_queue_elt_p_t); } } RPC_DBG_PRINTF (rpc_e_dbg_general, 5, ("(cthread_pool_start) pool %p (%d threads)\n", p, p->n_threads)); /* * Tell the pool's threads to start. */ p->stop = false; *status = rpc_s_ok; CLEANUP: if (*status != rpc_s_ok) { unsigned32 st; if (p->ctbl != NULL) { cthread_pool_stop(p, true /* wait */, &st); p->ctbl = NULL; } while (! RPC_LIST_EMPTY(p->free_queue)) { cthread_queue_elt_p_t qe; RPC_LIST_REMOVE_TAIL(p->free_queue, qe, cthread_queue_elt_p_t); RPC_MEM_FREE (qe, RPC_C_MEM_CTHREAD_QETBL); } } } /* **++ ** ** ROUTINE NAME: cthread_pool_stop ** ** SCOPE: INTERNAL ** ** DESCRIPTION: ** ** Stop the pool's call threadas. ** ** INPUTS: ** ** p the pool to stop ** wait_flag T => wait for threads to stop ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: ** ** status A value indicating the status of the routine. ** ** rpc_s_ok ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: none ** ** SIDE EFFECTS: none ** **-- **/ INTERNAL void cthread_pool_stop ( cthread_pool_elt_p_t p, unsigned32 wait_flag, unsigned32 *status ) { int cs; unsigned32 i; cthread_elt_p_t cthread; CODING_ERROR (status); RPC_MUTEX_LOCK_ASSERT (cthread_mutex); /* * If there are no threads associated with the pool, we're done. */ if (p->ctbl == NULL) { *status = rpc_s_ok; return; } RPC_DBG_PRINTF (rpc_e_dbg_general, 5, ("(cthread_pool_stop) pool %p (%d threads) stopping\n", p, p->n_threads)); /* * Tell the threads to stop when they complete the current activities. */ p->stop = true; /* * Unblock any waiting call threads so they detect the 'stop' condition. */ for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++) { if (cthread->thread_state != RPC_C_NO_CTHREAD) { RPC_COND_SIGNAL (cthread->thread_cond, cthread_mutex); } } /* * If not waiting, we're done. */ if (!wait_flag) { *status = rpc_s_ok; return; } /* * Disable cancel delivery while awaiting cthread termination. This * ensures completion and preservation of invariants. If it becomes * necessary, we can allow cancels and setup a cleanup handler and * in the event of a cancel, queue the pool to the reaper for final * cleanup. */ cs = dcethread_enableinterrupt_throw (0); /* * Wait for all call threads to complete. * * We wait on the call thread's private cv; the cthread signals its * cv prior to exiting. While dcethread_join() would have done the * trick; this scheme works just as well and is portable to environments * that may have difficulty implementing join (i.e. for Kernel RPC). */ for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++) { while (cthread->thread_state != RPC_C_NO_CTHREAD) { RPC_COND_WAIT (cthread->thread_cond, cthread_mutex); } } /* * Restore the cancel state. */ dcethread_enableinterrupt_throw (cs); RPC_DBG_PRINTF (rpc_e_dbg_general, 5, ("(cthread_pool_stop) pool %p (%d threads) stopped\n", p, p->n_threads)); /* * Clean up and free the ctbl. If there is a ctbl, the assumption * is that all of the ctable's entries have valid initialized cv's. */ for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++) { RPC_COND_DELETE (cthread->thread_cond, cthread_mutex); } RPC_MEM_FREE (p->ctbl, RPC_C_MEM_CTHREAD_CTBL); p->ctbl = NULL; /* * Free up the queue elt list. */ while (! RPC_LIST_EMPTY(p->free_queue)) { cthread_queue_elt_p_t qe; RPC_LIST_REMOVE_TAIL(p->free_queue, qe, cthread_queue_elt_p_t); RPC_MEM_FREE (qe, RPC_C_MEM_CTHREAD_QETBL); } *status = rpc_s_ok; } /* **++ ** ** ROUTINE NAME: cthread_pool_assign_thread ** ** SCOPE: INTERNAL ** ** DESCRIPTION: ** ** Locate an idle thread in the indicated pool. ** ** INPUTS: ** ** p the pool to search ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: ** ** status A value indicating the status of the routine. ** ** rpc_s_ok ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: ** ** cthread the assigned thread (NULL if none found) ** ** SIDE EFFECTS: none ** **-- **/ INTERNAL cthread_elt_p_t cthread_pool_assign_thread ( cthread_pool_elt_p_t p ) { cthread_elt_p_t cthread = NULL; RPC_MUTEX_LOCK_ASSERT (cthread_mutex); /* * Locate an idle call thread (if one exists). */ if (p->n_idle > 0) { if (p->idle_cthread != NULL) { cthread = p->idle_cthread; assert(cthread->thread_state == RPC_C_IDLE_CTHREAD); p->idle_cthread = NULL; } else { cthread_elt_p_t ct; for (ct = p->ctbl; ct < &p->ctbl[p->n_threads]; ct++) { if (ct->thread_state == RPC_C_IDLE_CTHREAD) { cthread = ct; break; } } } } if (cthread != NULL) { cthread->thread_state = RPC_C_ACTIVE_CTHREAD; p->n_idle--; } return cthread; } /* **++ ** ** ROUTINE NAME: cthread_pool_queue_call ** ** SCOPE: INTERNAL ** ** DESCRIPTION: ** ** Attempt to queue a call for deferred execution. ** ** INPUTS: ** ** p the call's pool ** call_rep the call ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: ** ** status A value indicating the status of the routine. ** ** rpc_s_ok ** rpc_s_cthread_not_found ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: none ** ** SIDE EFFECTS: none ** **-- **/ INTERNAL void cthread_pool_queue_call ( cthread_pool_elt_p_t p, rpc_call_rep_p_t call_rep, unsigned32 *status ) { rpc_cthread_pvt_info_p_t pvt = &call_rep->u.server.cthread; boolean is_default_pool = (p == cthread_default_pool); CODING_ERROR (status); RPC_MUTEX_LOCK_ASSERT (cthread_mutex); /* * If the queue is full, we're done. */ if (CTHREAD_POOL_IS_QUEUE_FULL (p)) { RPC_DBG_GPRINTF (( "(cthread_pool_queue_call) pool %p full call_rep %p\n", p, call_rep)); *status = rpc_s_cthread_not_found; return; } /* * Indicate that the call is queued. */ pvt->is_queued = true; /* * Always add the call to the default pool's queue. * * ONLY Update the default pool's n_queued if the call is for the * default pool (see the cthread_pool_elt description comments above)! */ RPC_LIST_ADD_TAIL (cthread_default_pool->call_queue, call_rep, rpc_call_rep_p_t); if (is_default_pool) { pvt->qelt = NULL; p->n_queued++; } /* * If it's a reserved pool, add it to its queue too. */ if (! is_default_pool) { cthread_queue_elt_p_t qelt; RPC_LIST_REMOVE_HEAD(p->free_queue, qelt, cthread_queue_elt_p_t); assert (qelt != NULL); qelt->call_rep = call_rep; pvt->qelt = (dce_pointer_t)qelt; RPC_LIST_ADD_TAIL (p->call_queue, qelt, cthread_queue_elt_p_t); p->n_queued++; } RPC_DBG_PRINTF (rpc_e_dbg_general, 5, ("(cthread_pool_queue_call) pool %p (now %d) call_rep %p\n", p, p->n_queued, call_rep)); *status = rpc_s_ok; } /* **++ ** ** ROUTINE NAME: cthread_pool_dequeue_first ** ** SCOPE: INTERNAL ** ** DESCRIPTION: Remove the first queued call rep from a pool. ** ** INPUTS: none ** ** INPUTS/OUTPUTS: ** ** p The pool of interest ** ** OUTPUTS: none ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: ** ** call_rep The dequeued call rep (may be NULL). ** ** SIDE EFFECTS: none ** **-- **/ INTERNAL rpc_call_rep_p_t cthread_pool_dequeue_first ( cthread_pool_elt_p_t p ) { rpc_call_rep_p_t call_rep; boolean is_default_pool = (p == cthread_default_pool); RPC_MUTEX_LOCK_ASSERT (cthread_mutex); /* * If the queue is empty we're done. */ if (CTHREAD_POOL_IS_QUEUE_EMPTY(p)) { return NULL; } /* * Determine the call rep of interest and then dequeue it. */ if (is_default_pool) { /* * The default pool's queue is the queue of call reps. */ RPC_LIST_FIRST (p->call_queue, call_rep, rpc_call_rep_p_t); } else { cthread_queue_elt_p_t qelt; /* * The call was really for a reserved pool; determine the * call rep via the indirection queue elt. */ RPC_LIST_FIRST (p->call_queue, qelt, cthread_queue_elt_p_t); call_rep = qelt->call_rep; assert ((cthread_queue_elt_p_t)call_rep->u.server.cthread.qelt == qelt); } (void) cthread_call_dequeue (call_rep); return call_rep; } /* **++ ** ** ROUTINE NAME: cthread_call_dequeue ** ** SCOPE: INTERNAL ** ** DESCRIPTION: Remove a call rep from the call executor ** thread waiting queue, if it's there. ** ** INPUTS: none ** ** INPUTS/OUTPUTS: ** ** call_rep The call rep to be dequeued. ** ** OUTPUTS: none ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: boolean ** ** T => call was previously queued. ** ** SIDE EFFECTS: none ** **-- **/ INTERNAL boolean32 cthread_call_dequeue ( rpc_call_rep_p_t call_rep ) { rpc_cthread_pvt_info_p_t pvt = &call_rep->u.server.cthread; cthread_queue_elt_p_t qelt = (cthread_queue_elt_p_t)pvt->qelt; cthread_pool_elt_p_t p; RPC_MUTEX_LOCK_ASSERT (cthread_mutex); /* * If call's not queued, were done. */ if (! pvt->is_queued) { return false; } /* * Dequeue the call from the default pool. */ RPC_LIST_REMOVE (cthread_default_pool->call_queue, call_rep); /* * The call may or may not been for the default pool. */ if (qelt == NULL) { /* * The call was for the default pool; adjust the * default pool queue count (see the cthread_pool_elt * description). */ p = cthread_default_pool; cthread_default_pool->n_queued--; } else { /* * The call was really for a reserved pool; * remove it from that queue too. */ p = qelt->pool; assert (qelt->call_rep == call_rep); assert ((cthread_queue_elt_p_t)pvt->qelt == qelt); RPC_LIST_REMOVE (p->call_queue, qelt); p->n_queued--; /* * return the queue elt to its free list. */ qelt->call_rep = NULL; RPC_LIST_ADD_HEAD (p->free_queue, qelt, cthread_queue_elt_p_t); } /* * The call is no longer queued. */ pvt->is_queued = false; pvt->qelt = NULL; RPC_DBG_PRINTF (rpc_e_dbg_general, 5, ("(cthread_call_dequeue) pool %p (%d remain) call_rep %p\n", p, p->n_queued, call_rep)); return true; } /* **++ ** ** ROUTINE NAME: rpc__cthread_init ** ** SCOPE: PRIVATE - declared in comcthd.h ** ** DESCRIPTION: ** ** Initialize the cthread package. ** ** INPUTS: none ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: ** ** status A value indicating the status of the routine. ** ** rpc_s_ok ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: none ** ** SIDE EFFECTS: none ** **-- **/ PRIVATE void rpc__cthread_init ( unsigned32 *status ) { CODING_ERROR (status); RPC_MUTEX_INIT (cthread_mutex); *status = rpc_s_ok; } /* **++ ** ** ROUTINE NAME: rpc_server_create_thread_pool ** ** SCOPE: PUBLIC - declared in rpcpvt.idl ** ** DESCRIPTION: ** ** Allocate the resources for a pool (rpc__cthread_pool_start() actually ** creates the threads). ** ** INPUTS: ** ** n_threads number of call threads in the pool ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: ** ** phandle Handle to the new pool. ** status A value indicating the status of the routine. ** ** rpc_s_ok ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: none ** ** SIDE EFFECTS: none ** **-- **/ PUBLIC void rpc_server_create_thread_pool ( unsigned32 n_threads, rpc_thread_pool_handle_t *phandle, unsigned32 *status ) { cthread_pool_elt_p_t p; CODING_ERROR (status); RPC_VERIFY_INIT (); *phandle = NULL; RPC_MUTEX_LOCK (cthread_mutex); p = cthread_pool_alloc(n_threads, false /* is_default_pool */, status); if (*status != rpc_s_ok) goto CLEANUP; /* * Make the newly created pool "public". */ RPC_LIST_ADD_TAIL (cthread_reserved_pools, p, cthread_pool_elt_p_t); *phandle = (rpc_thread_pool_handle_t) p; /* * Normally, reserved pools are started up when the default pool * gets started, as a consequence of calling rpc_server_listen. * However, if the default pool has already been started up, then * start up this reserved pool immediately so that it will be available * for handling calls. */ if (cthread_invoke_enabled) cthread_pool_start (p, status); CLEANUP: RPC_MUTEX_UNLOCK (cthread_mutex); } /* **++ ** ** ROUTINE NAME: rpc_server_free_thread_pool ** ** SCOPE: PUBLIC - declared in rpcpvt.idl ** ** DESCRIPTION: ** ** Stop the pool's call threads and free the pool resources. ** ** INPUTS: ** ** phandle Pool to free ** wait_flag T => wait for threads to stop ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: ** ** status A value indicating the status of the routine. ** ** rpc_s_ok ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: none ** ** SIDE EFFECTS: none ** **-- **/ PUBLIC void rpc_server_free_thread_pool ( rpc_thread_pool_handle_t *phandle, boolean32 wait_flag, unsigned32 *status ) { cthread_pool_elt_p_t p = (cthread_pool_elt_p_t) *phandle; CODING_ERROR (status); RPC_VERIFY_INIT (); RPC_MUTEX_LOCK (cthread_mutex); /* * Remove the pool from the set of reserved pools. * For all practical external purposes, the reserved pool * no longer exists (though its cthreads may be still executing * their current (and queued) calls. */ RPC_LIST_REMOVE (cthread_reserved_pools, p); /* * Stop the pool's threads (waiting as directed). */ cthread_pool_stop(p, wait_flag, status); /* * If we waited for the pool to become idle we can immediately free it; * otherwise we've got to queue it for eventual freeing (and start up the * reaper timer if this is the first item being queued). */ if (wait_flag || p->ctbl == NULL) { unsigned32 st; cthread_pool_free(p, &st); } else { if (RPC_LIST_EMPTY(cthread_reaper_queue)) { rpc__timer_set(&cthread_reaper_timer, cthread_reaper, NULL, RPC_C_CTHREAD_REAPER_FREQ); } RPC_LIST_ADD_TAIL (cthread_reaper_queue, p, cthread_pool_elt_p_t); } *phandle = NULL; /* CLEANUP: */ RPC_MUTEX_UNLOCK (cthread_mutex); } /* **++ ** ** ROUTINE NAME: rpc_server_set_thread_pool_fn ** ** SCOPE: PUBLIC - declared in rpcpvt.idl ** ** DESCRIPTION: ** ** [Un]Register a thread pool lookup function with the runtime. ** ** INPUTS: ** ** pool_fn the lookup function - may be NULL ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: ** ** status A value indicating the status of the routine. ** ** rpc_s_ok ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: none ** ** SIDE EFFECTS: none ** **-- **/ PUBLIC void rpc_server_set_thread_pool_fn ( rpc_thread_pool_fn_t pool_fn, unsigned32 *status ) { CODING_ERROR (status); RPC_VERIFY_INIT (); RPC_MUTEX_LOCK (cthread_mutex); if (pool_fn != NULL && cthread_pool_lookup_fn != NULL) { *status = -1; /* !!! already set */ goto CLEANUP; } cthread_pool_lookup_fn = pool_fn; /* be it NULL or otherwise */ *status = rpc_s_ok; CLEANUP: RPC_MUTEX_UNLOCK (cthread_mutex); } /* **++ ** ** ROUTINE NAME: rpc_server_set_thread_qlen ** ** SCOPE: PUBLIC - declared in rpcpvt.idl ** ** DESCRIPTION: ** ** Adjust the maximum number of queued calls for a specified thread pool. ** ** INPUTS: ** ** phandle the pool whose queue size is being adjusted ** a NULL argument can be used to specify that the ** the operation should be applied to the default pool. ** queue_size the new size ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: ** ** status A value indicating the status of the routine. ** ** rpc_s_ok ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: none ** ** SIDE EFFECTS: none ** **-- **/ PUBLIC void rpc_server_set_thread_pool_qlen ( rpc_thread_pool_handle_t phandle, unsigned32 queue_size, unsigned32 * status ) { cthread_pool_elt_p_t pool = (cthread_pool_elt_p_t) phandle; CODING_ERROR (status); RPC_VERIFY_INIT (); RPC_MUTEX_LOCK (cthread_mutex); *status = rpc_s_ok; /* * If the caller sent NULL as the pool parameter, apply the operation * to the default pool. */ if (pool == NULL) { cthread_default_call_queue_size = queue_size; /* * If the default pool hasn't been started yet, we're done; the * global value will be used when it does get started up. If the * default pool *has* been started, just update its max_queued * value. */ if (cthread_default_pool != NULL) { cthread_default_pool->max_queued = queue_size; } } else { unsigned32 i; /* * We're operating on a private pool... * * If this pool has not been started yet, just record the value for * the max queue size. The actual queue element data structure will * get created when the pool is started. */ if (RPC_LIST_EMPTY(pool->free_queue)) { pool->max_queued = queue_size; } else { /* * This private pool has already been started. * * Considering that calls may currently be queued for this pool, it * would be extremely tricky, not to mention probably not useful, to * allow the caller to shrink the call queue length. Only update the * queue length if it's being increased. */ if (queue_size > pool->max_queued) { /* * Alloc up some more queue elements, and add them to the list. */ for (i = pool->max_queued; i < queue_size; i++) { cthread_queue_elt_p_t qe; RPC_MEM_ALLOC (qe, cthread_queue_elt_p_t, sizeof (cthread_queue_elt_t), RPC_C_MEM_CTHREAD_QETBL, RPC_C_MEM_WAITOK); if (qe == NULL) { *status = rpc_s_no_memory; /* * Try to stay calm... */ pool->max_queued = i; RPC_MUTEX_UNLOCK (cthread_mutex); return; } qe->pool = pool; RPC_LIST_ADD_TAIL (pool->free_queue, qe, cthread_queue_elt_p_t); } pool->max_queued = queue_size; } } } RPC_MUTEX_UNLOCK (cthread_mutex); } /* **++ ** ** ROUTINE NAME: rpc__cthread_start_all ** ** SCOPE: PRIVATE - declared in comcthd.h ** ** DESCRIPTION: ** ** Arrange for all the call execution threads to be created and ** enabled RPC execution. ** ** INPUTS: ** ** default_cthreads The number of default pool call threads which will be ** created ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: ** ** status A value indicating the status of the routine. ** ** rpc_s_ok ** rpc_s_no_memory ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: none ** ** SIDE EFFECTS: none ** **-- **/ PRIVATE void rpc__cthread_start_all ( unsigned32 default_pool_cthreads, unsigned32 *status ) { cthread_pool_elt_p_t p; CODING_ERROR (status); RPC_MUTEX_LOCK (cthread_mutex); /* * Alloc the default pool if necessary (or just adjust its * thread count). */ if (cthread_default_pool == NULL) { cthread_default_pool = cthread_pool_alloc ( default_pool_cthreads, true, /* is_default_pool */ status); if (*status != rpc_s_ok) goto CLEANUP; } else { cthread_pool_set_threadcnt(cthread_default_pool, default_pool_cthreads, status); if (*status != rpc_s_ok) goto CLEANUP; } /* * Fire up all of the call executor threads. */ cthread_pool_start (cthread_default_pool, status); if (*status != rpc_s_ok) goto CLEANUP; RPC_LIST_FIRST (cthread_reserved_pools, p, cthread_pool_elt_p_t); while (p != NULL) { cthread_pool_start (p, status); if (*status != rpc_s_ok) goto CLEANUP; RPC_LIST_NEXT (p, p, cthread_pool_elt_p_t); } /* * enable RPC queuing / execution */ cthread_invoke_enabled = true; *status = rpc_s_ok; CLEANUP: RPC_MUTEX_UNLOCK (cthread_mutex); } /* **++ ** ** ROUTINE NAME: rpc__cthread_stop_all ** ** SCOPE: PRIVATE - declared in comcthd.h ** ** DESCRIPTION: ** ** Stop all the call executor threads. Don't return until all have stopped. ** ** INPUTS: none ** ** INPUTS/OUTPUTS: none ** ** OUTPUTS: ** ** status A value indicating the status of the routine. ** ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: none ** ** SIDE EFFECTS: none ** **-- **/ PRIVATE void rpc__cthread_stop_all ( unsigned32 *status ) { cthread_pool_elt_p_t p; CODING_ERROR (status); RPC_MUTEX_LOCK (cthread_mutex); /* * Disable subsequent call execution processing while we're * waiting for the executors to complete. */ cthread_invoke_enabled = false; /* * Tell each pool to stop. */ cthread_pool_stop(cthread_default_pool, false, status); if (*status != rpc_s_ok) goto CLEANUP; RPC_LIST_FIRST (cthread_reserved_pools, p, cthread_pool_elt_p_t); while (p != NULL) { cthread_pool_stop(p, false, status); if (*status != rpc_s_ok) goto CLEANUP; RPC_LIST_NEXT (p, p, cthread_pool_elt_p_t); } /* * Now wait for each pool's threads to complete. */ cthread_pool_stop(cthread_default_pool, true, status); if (*status != rpc_s_ok) goto CLEANUP; RPC_LIST_FIRST (cthread_reserved_pools, p, cthread_pool_elt_p_t); while (p != NULL) { cthread_pool_stop(p, true, status); if (*status != rpc_s_ok) goto CLEANUP; RPC_LIST_NEXT (p, p, cthread_pool_elt_p_t); } *status = rpc_s_ok; CLEANUP: RPC_MUTEX_UNLOCK (cthread_mutex); } /* **++ ** ** ROUTINE NAME: rpc__cthread_invoke_null ** ** SCOPE: PRIVATE - declared in comcthd.h ** ** DESCRIPTION: ** ** Arrange for a call execution thread to (eventually) be allocated to ** "execute" the RPC. ** ** INPUTS: none ** ** INPUTS/OUTPUTS: ** ** call_rep The call rep for the incoming call. ** call_executor The address of a routine to be called when the ** call thread actually wakes up ** args A pointer to be passed to the called routine ** ** OUTPUTS: ** ** status A value indicating the status of the routine. ** ** rpc_s_ok ** rpc_s_cthread_not_found ** rpc_s_call_queued ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: void ** ** SIDE EFFECTS: ** call may be queued if no available call executors ** **-- **/ PRIVATE void rpc__cthread_invoke_null ( rpc_call_rep_p_t call_rep, uuid_p_t object, uuid_p_t if_uuid, unsigned32 if_ver, unsigned32 if_opnum, rpc_prot_cthread_executor_fn_t cthread_executor, dce_pointer_t args, unsigned32 *status ) { rpc_cthread_pvt_info_p_t pvt = &call_rep->u.server.cthread; unsigned32 lookup_fn_st; cthread_pool_elt_p_t p; cthread_elt_p_t cthread; CODING_ERROR (status); RPC_MUTEX_LOCK (cthread_mutex); /* * Check to ensure that it's still desireable to queue/execute a call. * * While strictly speaking we need to examine cthread_invoke_enabled * under a mutex, we really don't want to pay the cost in this critical * path and I think things will work reasonably safely get by without it. * The worst that will happen is that a (couple) extra call(s) will be * allowed to be queued / executed during shutdown processing. */ if (cthread_invoke_enabled == false) { *status = rpc_s_cthread_invoke_disabled; goto CLEANUP; } /* * Setup fields in the call rep for subsequent execution. */ pvt->executor = cthread_executor; pvt->optargs = args; /* * Attempt to locate / assign an idle thread (this code is * in-line because this is the fast-path). */ CTHREAD_POOL_LOOKUP_RESERVED(object, if_uuid, if_ver, if_opnum, &p, &lookup_fn_st); if (lookup_fn_st != 0) { *status = rpc_s_cthread_not_found; goto CLEANUP; } if (p == NULL) { /* * Only concerned with default pool. */ p = cthread_default_pool; CTHREAD_POOL_ASSIGN_THREAD(cthread_default_pool, &cthread); } else { /* * First assign an idle reserved pool thread; otherwise, * assign an idle default pool thread. */ CTHREAD_POOL_ASSIGN_THREAD(p, &cthread); if (cthread == NULL) { CTHREAD_POOL_ASSIGN_THREAD(cthread_default_pool, &cthread); } } /* * If we've succeeded in assigning a cthread, arrange for it to * actually execute the RPC. Otherwise, attempt to queue the RPC * for deferred execution. */ if (cthread != NULL) { /* * Setup fields in the call rep for subsequent execution. */ pvt->is_queued = false; pvt->thread_h = cthread->thread_id; cthread->call_rep = call_rep; /* * Fire up the assigned cthread. */ RPC_COND_SIGNAL(cthread->thread_cond, cthread_mutex); *status = rpc_s_ok; } else { cthread_pool_queue_call(p, call_rep, status); if (*status == rpc_s_ok) *status = rpc_s_call_queued; } CLEANUP: RPC_MUTEX_UNLOCK (cthread_mutex); } /* **++ ** ** ROUTINE NAME: rpc__cthread_dequeue ** ** SCOPE: PRIVATE - included in comcthd.h ** ** DESCRIPTION: Remove a call rep from the call executor ** thread waiting queue, if it's there. ** ** INPUTS: none ** ** INPUTS/OUTPUTS: ** ** call_rep The call rep to be dequeued. ** ** OUTPUTS: none ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: boolean ** ** T => call was previously queued. ** ** SIDE EFFECTS: none ** **-- **/ PRIVATE boolean32 rpc__cthread_dequeue ( rpc_call_rep_p_t call_rep ) { boolean32 was_dequeued; RPC_MUTEX_LOCK (cthread_mutex); was_dequeued = cthread_call_dequeue (call_rep); RPC_MUTEX_UNLOCK (cthread_mutex); return was_dequeued; } /* **++ ** ** ROUTINE NAME: rpc__cthread_cancel ** ** SCOPE: PRIVATE - included in comcthd.h ** ** DESCRIPTION: Post a cancel to cthread associated with a call ** ** INPUTS: none ** ** INPUTS/OUTPUTS: ** ** call The call that the cancel is associated with. ** ** OUTPUTS: none ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: void ** ** SIDE EFFECTS: ** a cancel may be posted to the call execution thread ** **-- **/ PRIVATE void rpc__cthread_cancel ( rpc_call_rep_p_t call ) { RPC_CALL_LOCK_ASSERT(call); if (!call->u.server.cancel.accepting) return; call->u.server.cancel.count++; if (!call->u.server.cancel.queuing) { rpc_cthread_pvt_info_p_t pvt = &call->u.server.cthread; RPC_MUTEX_LOCK (cthread_mutex); dcethread_interrupt_throw(pvt->thread_h); RPC_MUTEX_UNLOCK (cthread_mutex); } } /* **++ ** ** ROUTINE NAME: rpc__cthread_cancel_caf ** ** SCOPE: PRIVATE - included in comcthd.h ** ** DESCRIPTION: Check for pending cancel and flush. ** ** INPUTS: none ** ** INPUTS/OUTPUTS: ** ** call The call that the cancel is associated with. ** ** OUTPUTS: none ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: ** boolean32 => T iff call had a pending cancel ** ** SIDE EFFECTS: ** the call will no longer accept cancels ** any pending cancels will be flushed (i.e. the ** call thread must not have any residual pending ** cancels upon completion) ** **-- **/ PRIVATE boolean32 rpc__cthread_cancel_caf ( rpc_call_rep_p_t call ) { int oc; RPC_CALL_LOCK_ASSERT(call); /* * In the event this is called multiple times, return something * sensible (i.e. return the current "had pending" state). */ if (!call->u.server.cancel.accepting) { return (call->u.server.cancel.had_pending); } /* * Cancels are no longer accepted by this call. */ call->u.server.cancel.accepting = false; /* * Determine if the call has a cancel pending (flush any accepted * cancels). Only want to take the expensive path if a cancel request * had been previously accepted. */ call->u.server.cancel.had_pending = false; if (call->u.server.cancel.count) { #ifndef _PTHREAD_NO_CANCEL_SUPPORT oc = dcethread_enableinterrupt_throw(1); DCETHREAD_TRY { dcethread_checkinterrupt(); } DCETHREAD_CATCH(dcethread_interrupt_e) { call->u.server.cancel.had_pending = true; } DCETHREAD_ENDTRY dcethread_enableinterrupt_throw(oc); #else /* * Cancels not supported, so the previously accepted forwarded * cancels are still pending. */ call->u.server.cancel.had_pending = true; #endif } call->u.server.cancel.count = 0; /* * Let the caller know if a cancel was pending (without them having * to look at the flag). */ return (call->u.server.cancel.had_pending); } /* **++ ** ** ROUTINE NAME: rpc__cthread_cancel_enable_post ** ** SCOPE: PRIVATE - included in comcthd.h ** ** DESCRIPTION: Enable direct posting of cancels to a cthread; ** post any previously queued cancels. ** ** INPUTS: none ** ** INPUTS/OUTPUTS: ** ** call The call that the cancel is associated with. ** ** OUTPUTS: none ** ** IMPLICIT INPUTS: none ** ** IMPLICIT OUTPUTS: none ** ** FUNCTION VALUE: void ** ** SIDE EFFECTS: ** a cancel may be posted to the call execution thread ** **-- **/ PRIVATE void rpc__cthread_cancel_enable_post ( rpc_call_rep_p_t call ) { rpc_cthread_pvt_info_p_t pvt = &call->u.server.cthread; unsigned16 cancel_cnt; RPC_CALL_LOCK_ASSERT(call); RPC_MUTEX_LOCK (cthread_mutex); if (call->u.server.cancel.accepting && call->u.server.cancel.queuing) { call->u.server.cancel.queuing = false; for (cancel_cnt = call->u.server.cancel.count; cancel_cnt--; ) { dcethread_interrupt_throw(pvt->thread_h); } } RPC_MUTEX_UNLOCK (cthread_mutex); }