1/* 2 * Copyright (c) 2010 Apple Inc. All rights reserved. 3 * 4 * @APPLE_LICENSE_HEADER_START@ 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. Neither the name of Apple Inc. ("Apple") nor the names of its 16 * contributors may be used to endorse or promote products derived from 17 * this software without specific prior written permission. 18 * 19 * THIS SOFTWARE IS PROVIDED BY APPLE AND ITS CONTRIBUTORS "AS IS" AND ANY 20 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 21 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 22 * DISCLAIMED. IN NO EVENT SHALL APPLE OR ITS CONTRIBUTORS BE LIABLE FOR ANY 23 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 24 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 26 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 28 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 * 30 * Portions of this software have been released under the following terms: 31 * 32 * (c) Copyright 1989-1993 OPEN SOFTWARE FOUNDATION, INC. 33 * (c) Copyright 1989-1993 HEWLETT-PACKARD COMPANY 34 * (c) Copyright 1989-1993 DIGITAL EQUIPMENT CORPORATION 35 * 36 * To anyone who acknowledges that this file is provided "AS IS" 37 * without any express or implied warranty: 38 * permission to use, copy, modify, and distribute this file for any 39 * purpose is hereby granted without fee, provided that the above 40 * copyright notices and this notice appears in all source code copies, 41 * and that none of the names of Open Software Foundation, Inc., Hewlett- 42 * Packard Company or Digital Equipment Corporation be used 43 * in advertising or publicity pertaining to distribution of the software 44 * without specific, written prior permission. Neither Open Software 45 * Foundation, Inc., Hewlett-Packard Company nor Digital 46 * Equipment Corporation makes any representations about the suitability 47 * of this software for any purpose. 48 * 49 * Copyright (c) 2007, Novell, Inc. All rights reserved. 50 * Redistribution and use in source and binary forms, with or without 51 * modification, are permitted provided that the following conditions 52 * are met: 53 * 54 * 1. Redistributions of source code must retain the above copyright 55 * notice, this list of conditions and the following disclaimer. 56 * 2. Redistributions in binary form must reproduce the above copyright 57 * notice, this list of conditions and the following disclaimer in the 58 * documentation and/or other materials provided with the distribution. 59 * 3. Neither the name of Novell Inc. nor the names of its contributors 60 * may be used to endorse or promote products derived from this 61 * this software without specific prior written permission. 62 * 63 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY 64 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 65 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 66 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY 67 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 68 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 69 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 70 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 71 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 72 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 73 * 74 * @APPLE_LICENSE_HEADER_END@ 75 */ 76 77/* 78** 79** NAME 80** 81** comcthd.c 82** 83** FACILITY: 84** 85** Remote Procedure Call (RPC) 86** 87** ABSTRACT: 88** 89** Definition of the Call Thread Services for the Common 90** Communication Services component. These routines permit 91** a call thread to be created and invoked. 92** 93** 94*/ 95 96#include <commonp.h> /* Common declarations for all RPC runtime */ 97#include <com.h> /* Common communications services */ 98#include <comprot.h> /* Common protocol services */ 99#include <comnaf.h> /* Common network address family services */ 100#include <comp.h> /* Private communications services */ 101#include <comcthd.h> /* Shared call thread services */ 102 103 104/* 105 * The multiplier to apply to a pool's n_threads to get the queue depth. 106 */ 107#ifndef RPC_C_CTHREAD_QUEUE_MULTIPLIER 108# define RPC_C_CTHREAD_QUEUE_MULTIPLIER 8 109#endif 110 111/* 112 * Call thread states 113 */ 114#define RPC_C_NO_CTHREAD 0 /* no thread exists */ 115#define RPC_C_IDLE_CTHREAD 1 /* thread exists but is idle */ 116#define RPC_C_ACTIVE_CTHREAD 2 /* thread has call allocated to it */ 117 118/* 119 * The call thread table element structure. 120 * See the "Call Thread Pool" description for more info... 121 * 122 * When a thread is idle, it is waiting on its private condition variable. 123 * 124 * The per element "pool" pointer simply solves the problem of needing 125 * the created thread to have two args (without having to alloc memory 126 * just for the create phase). The space really isn't "wasted" since 127 * since each thread would otherwise have to keep this var on their stack. 128 */ 129typedef struct 130{ 131 unsigned8 thread_state; 132 dcethread* thread_id; 133 rpc_cond_t thread_cond; 134 struct cthread_pool_elt_t *pool; 135 rpc_call_rep_p_t call_rep; 136} cthread_elt_t, *cthread_elt_p_t; 137 138/* 139 * Reserved Pool Call Queue Element. 140 * 141 * These queue elements form a backing structure that allows 142 * us to maintain a call_rep on two pool lists. 143 */ 144typedef struct { 145 rpc_list_t link; 146 struct cthread_pool_elt_t *pool; 147 rpc_call_rep_p_t call_rep; 148} cthread_queue_elt_t, *cthread_queue_elt_p_t; 149 150/* 151 * Call Thread Pools. 152 * 153 * READ THIS IF YOU WANT TO UNDERSTAND THIS STUFF! 154 * 155 * This structure exists to accomplish the desired effect of allowing 156 * applications to have calls execute using threads from some application 157 * defined set of thread pools. The application declared thread pools are 158 * refered to "reserved" pools (due to a previous incarnation of this 159 * code which provided -a too limited scheme of- thread reservation on 160 * per-interface basis). 161 * 162 * The application (via the pool lookup function callout) gets to decide 163 * which pool a call should be associated with. This general mechanism 164 * provides the application with the basic hooks to implement any one 165 * of a number of schemes for allocating calls to threads. 166 * 167 * If the application declares that the call should use a reserved pool, 168 * a free thread from that pool will be allocated, otherwise, a default 169 * pool free thread will be allocated, otherwise, the call will be put 170 * on the default and reserved pool queue for execution by the first 171 * available default or reserved pool thread. In any case, the calls 172 * for a reserved pool are assigned idle execution threads in the order 173 * in which they are received. 174 * 175 * The default pool is created by rpc_server_listen(). Non-reserved 176 * (default pool) threads can execute *any* call (the idle threads are 177 * assigned to calls in the order in which they are received). The total 178 * number of call threads in a server is the sum of the threads in the 179 * default pool and the reserved pools. 180 * 181 * The relationship between n_queued, max_queued and the call_queue 182 * requires some explanation. n_queued and max_queued represent the 183 * number and limit respectively of the number of call_queue entries 184 * FOR THIS POOL TYPE. For *reserved pools*, all of these variables 185 * make sense in an intuitive way: n_queued always represents the true 186 * number of elements on the queue and the number of elements on the 187 * queue will not exceed max_queued. 188 * 189 * The default pool's use of these variables is less intuitive since 190 * *all* queued calls are on the default pool's queue. In this case, 191 * the number of elements on the queue can *exceed* the default pool's 192 * n_queued and max_queued! n_queued and max_queued (as stated above) 193 * strictly represent the number of default pool calls (i.e. those calls 194 * that are not associated with a reserved pool). The end result is 195 * that this accomplishes the desired max queuing limitations - i.e. 196 * the maximums are imposed on a per pool basis; use the queue to 197 * to determine of there are actually any queued calls to process, 198 * NOT n_queued. 199 * 200 * The default pool uses its pool.call_queue to directly link call reps 201 * and it does not use the pool.free_queue. The reserved pools can't 202 * directly link the call reps (because call reps only have one rpc_list 203 * "thread", because the rpc_list macros only work for a single 204 * "thread"...). Therefore, the reserved pools each maintain their 205 * own (static) set of cthread_call_queue_elt_t elements. Reserved pools' 206 * call_queue consists of a set of call_queue_elts that point to the 207 * associated call_reps (which are on the default pool queue). The 208 * call_queue_elts are maintained on the pool's pool.free_queue when 209 * not in use on the pool.call_queue. 210 * 211 * Startup / Shutdown processing... We have a requirement that 212 * rpc_server_listen() can be called, return and then be called again 213 * (a startup, shutdown, restart sequence). All call threads should 214 * be terminated by a shutdown (to free up resources). All threads 215 * (including those requested by previous calls to 216 * rpc_server_create_thread_pool()) must be automatically recreated upon 217 * a restart. 218 * 219 * Pool creation is two step process. First, a pool descriptor is 220 * allocated (cthread_pool_alloc()). Subsequently, a pool may be "started" 221 * (cthread_pool_start()); this actually creates the call threads. 222 * 223 * Shutting down involves stopping all of the call threads in all of 224 * the pools (including freeing each pool's call thread table). The 225 * pool descriptors are not freed. This is necessary to retain information 226 * that is needed to restart the server. 227 */ 228 229typedef struct cthread_pool_elt_t { 230 rpc_list_t link; /* linked list of pools */ 231 unsigned16 n_threads; /* total number of threads in the pool */ 232 unsigned16 n_idle; /* number of idle (available) threads */ 233 cthread_elt_p_t ctbl; /* the cthreads associated with the pool */ 234 cthread_elt_p_t idle_cthread; /* pointer to a known idle cthread */ 235 unsigned32 n_queued; /* see above! */ 236 unsigned32 max_queued; /* see above! */ 237 rpc_list_t call_queue; /* see above!; list of calls queued */ 238 rpc_list_t free_queue; /* see above!; list of free call_queue elements */ 239 unsigned stop : 1; /* T => pool's threads stop when complete */ 240 unsigned queue_elt_alloc : 1; /* T => start() should allocate queue elts */ 241} cthread_pool_elt_t, *cthread_pool_elt_p_t; 242 243 244/* 245 * Pools are only associated with the MAJOR version of an interface. 246 */ 247 248#define IF_VERS_MAJOR(_vers) ((_vers) & 0xffff) 249#define IF_VERS_MINOR(_vers) (((_vers) >> 16) & 0xffff) 250 251/* 252 * A couple of macros for convienience. 253 */ 254 255#define CTHREAD_POOL_IS_QUEUE_FULL(p) ((p)->n_queued >= (p)->max_queued) 256#define CTHREAD_POOL_IS_QUEUE_EMPTY(p) (RPC_LIST_EMPTY ((p)->call_queue)) 257 258/* 259 * A couple of macros for (fast path) performance. 260 */ 261 262#define CTHREAD_POOL_LOOKUP_RESERVED(object, if_uuid, if_ver, if_opnum, p, st) \ 263 { \ 264 RPC_MUTEX_LOCK_ASSERT (cthread_mutex); \ 265 if (cthread_pool_lookup_fn == NULL) \ 266 { \ 267 *(p) = NULL; \ 268 *st = 0; \ 269 } \ 270 else \ 271 { \ 272 rpc_if_id_t if_id; \ 273 if_id.uuid = *(if_uuid); \ 274 if_id.vers_major = IF_VERS_MAJOR(if_ver); \ 275 if_id.vers_minor = IF_VERS_MINOR(if_ver); \ 276 (*cthread_pool_lookup_fn) (\ 277 object, &if_id, if_opnum, \ 278 (rpc_thread_pool_handle_t *)p, st); \ 279 } \ 280 } 281 282#define CTHREAD_POOL_ASSIGN_THREAD(p, ct) \ 283 { \ 284 RPC_MUTEX_LOCK_ASSERT (cthread_mutex); \ 285 if ((p)->idle_cthread != NULL) \ 286 { \ 287 *(ct) = (p)->idle_cthread; \ 288 (p)->idle_cthread = NULL; \ 289 assert((*(ct))->thread_state == RPC_C_IDLE_CTHREAD); \ 290 (*(ct))->thread_state = RPC_C_ACTIVE_CTHREAD; \ 291 (p)->n_idle--; \ 292 } \ 293 else \ 294 { \ 295 *(ct) = cthread_pool_assign_thread(p); \ 296 } \ 297 } 298 299#define CTHREAD_POOL_IDLE_THREAD(p, ct) \ 300 { \ 301 (p)->n_idle++; \ 302 (p)->idle_cthread = ct; \ 303 } 304 305 306 307/* 308 * The reserved pools. 309 * 310 * The pools are linked together via their pool.pool_list field. 311 */ 312INTERNAL rpc_list_t cthread_reserved_pools; 313 314/* 315 * A handle to the special default pool. 316 */ 317INTERNAL cthread_pool_elt_p_t cthread_default_pool; 318 319/* 320 * The maximum number of calls that will be queued for the default 321 * thread pool. This value is settable via the rpc_server_set_thread_pool_qlen 322 * function. If not set, a default value of 8 times the number of 323 * default pool threads is used. 324 */ 325INTERNAL unsigned32 cthread_default_call_queue_size; 326 327/* 328 * The "reaper's" pool queue and timer. 329 * 330 * The pools are linked together via their pool.pool_list field. 331 */ 332INTERNAL rpc_list_t cthread_reaper_queue; 333INTERNAL rpc_timer_t cthread_reaper_timer; 334 335#ifndef RPC_C_CTHREAD_REAPER_FREQ 336# define RPC_C_CTHREAD_REAPER_FREQ RPC_CLOCK_SEC(3*60) 337#endif 338 339/* 340 * cthread_mutex protects all of the cthread private structures. 341 */ 342INTERNAL rpc_mutex_t cthread_mutex; 343 344/* 345 * A global that controls the overall ability of RPCs to be assigned 346 * to a pool/thread for execution (i.e. it controls rpc__cthread_invoke_null). 347 */ 348INTERNAL boolean cthread_invoke_enabled; 349 350/* 351 * A global that points to the application specified thread pool lookup function. 352 */ 353INTERNAL rpc_thread_pool_fn_t cthread_pool_lookup_fn; 354 355 356INTERNAL void cthread_create ( 357 cthread_elt_p_t /*cthread*/, 358 unsigned32 * /*status*/ 359 ); 360 361INTERNAL void cthread_call_executor ( 362 cthread_elt_p_t /*cthread*/ 363 ); 364 365INTERNAL void cthread_reaper ( 366 dce_pointer_t /*arg*/ 367 ); 368 369INTERNAL cthread_pool_elt_p_t cthread_pool_alloc ( 370 unsigned32 /*n_threads*/, 371 boolean32 /*is_default_pool*/, 372 unsigned32 * /*status*/ 373 ); 374 375INTERNAL void cthread_pool_set_threadcnt ( 376 cthread_pool_elt_p_t /*p*/, 377 unsigned32 /*n_threads*/, 378 unsigned32 * /*status*/ 379 ); 380 381INTERNAL void cthread_pool_free ( 382 cthread_pool_elt_p_t /*p*/, 383 unsigned32 * /*status*/ 384 ); 385 386INTERNAL void cthread_pool_start ( 387 cthread_pool_elt_p_t /*p*/, 388 unsigned32 * /*status*/ 389 ); 390 391INTERNAL void cthread_pool_stop ( 392 cthread_pool_elt_p_t /*p*/, 393 unsigned32 /*wait_flag*/, 394 unsigned32 * /*status*/ 395 ); 396 397INTERNAL cthread_elt_p_t cthread_pool_assign_thread ( 398 cthread_pool_elt_p_t /*p*/ 399 ); 400 401INTERNAL void cthread_pool_queue_call ( 402 cthread_pool_elt_p_t /*p*/, 403 rpc_call_rep_p_t /*call_rep*/, 404 unsigned32 * /*status*/ 405 ); 406 407INTERNAL rpc_call_rep_p_t cthread_pool_dequeue_first ( 408 cthread_pool_elt_p_t /*p*/ 409 ); 410 411INTERNAL boolean32 cthread_call_dequeue ( 412 rpc_call_rep_p_t /*call_rep*/ 413 ); 414 415 416/* 417**++ 418** 419** ROUTINE NAME: cthread_create 420** 421** SCOPE: INTERNAL 422** 423** DESCRIPTION: 424** 425** Create a call thread and initialize the table entry. 426** 427** INPUTS: 428** 429** cthread The cthread_table entry to use. 430** 431** INPUTS/OUTPUTS: none 432** 433** OUTPUTS: 434** 435** status A value indicating the status of the routine. 436** 437** rpc_s_ok 438** 439** IMPLICIT INPUTS: none 440** 441** IMPLICIT OUTPUTS: none 442** 443** FUNCTION VALUE: none 444** 445** SIDE EFFECTS: none 446** 447**-- 448**/ 449 450INTERNAL void cthread_create 451( 452 cthread_elt_p_t volatile cthread, 453 unsigned32 *status 454) 455{ 456 dcethread* handle_copy; 457 458 RPC_MUTEX_LOCK_ASSERT (cthread_mutex); 459 460 CODING_ERROR (status); 461 462 /* 463 * Create a thread for this entry, passing it a pointer its 464 * call thread table entry. Detach the thread since no one 465 * ever joins with the thread (and we don't want it to become 466 * forever zombie'd when it terminates). 467 */ 468 DCETHREAD_TRY { 469 dcethread_create_throw (&cthread->thread_id, 470 &rpc_g_server_dcethread_attr, 471 (dcethread_startroutine)cthread_call_executor, 472 (dcethread_addr)cthread); 473 474 cthread->thread_state = RPC_C_IDLE_CTHREAD; 475 476 handle_copy = cthread->thread_id; 477 dcethread_detach_throw(handle_copy); 478 479 *status = rpc_s_ok; 480 } DCETHREAD_CATCH_ALL(THIS_CATCH) { 481 *status = rpc_s_cthread_create_failed; 482 /* FIXME MNE */ 483 fprintf(stderr, "XXX MIREK: %s: %s: %d: cthread creation failure\n", 484 __FILE__, __PRETTY_FUNCTION__, __LINE__); 485 } DCETHREAD_ENDTRY 486 487 return; 488} 489 490/* 491**++ 492** 493** ROUTINE NAME: cthread_call_executor 494** 495** SCOPE: INTERNAL 496** 497** DESCRIPTION: 498** 499** The base routine of all call executor threads. Loop awaiting 500** and processing calls until told to stop. 501** 502** INPUTS: 503** 504** cthread Pointer to the thread's call table element 505** 506** INPUTS/OUTPUTS: none 507** 508** OUTPUTS: none 509** 510** IMPLICIT INPUTS: none 511** 512** IMPLICIT OUTPUTS: none 513** 514** FUNCTION VALUE: none 515** 516** SIDE EFFECTS: none 517** 518**-- 519**/ 520 521INTERNAL void cthread_call_executor 522( 523 cthread_elt_p_t cthread 524) 525{ 526 rpc_call_rep_t *call_rep = NULL; 527 rpc_cthread_pvt_info_p_t pvt = NULL; 528 cthread_pool_elt_p_t p = cthread->pool; 529 boolean skip_startup = true; 530 531 /* 532 * Call executors execute with general cancelability disabled 533 * until the stub dispatched to the manager. This prevents the 534 * call executor from having a pending cancel delivered to it before 535 * the manager is called. 536 */ 537 dcethread_enableinterrupt_throw(0); 538 539 RPC_MUTEX_LOCK (cthread_mutex); 540 541 if (CTHREAD_POOL_IS_QUEUE_EMPTY(p)) 542 { 543 skip_startup = false; 544 } 545 546 /* 547 * Loop executing calls until we're told to exit. 548 */ 549 while (true) 550 { 551 boolean run_queued_call = false; 552 553 if (!skip_startup) 554 { 555 /* 556 * Update the pool's idle thread info. 557 */ 558 CTHREAD_POOL_IDLE_THREAD(p, cthread); 559 560 /* 561 * Wait for a call assignment (or until we're told to exit). 562 */ 563 while (cthread->thread_state == RPC_C_IDLE_CTHREAD && ! p->stop) 564 { 565 RPC_COND_WAIT (cthread->thread_cond, cthread_mutex); 566 } 567 568 /* 569 * If we've been told to stop, then do so. 570 */ 571 if (p->stop) 572 { 573 break; 574 } 575 576 /* 577 * Setup the call that was assigned to us. 578 */ 579 call_rep = cthread->call_rep; 580 assert(call_rep != NULL); 581 pvt = &call_rep->u.server.cthread; 582 } 583 /* 584 * Execute the call assigned to us, followed by any queued calls. 585 */ 586 do 587 { 588 if (!skip_startup) 589 { 590 RPC_DBG_PRINTF (rpc_e_dbg_general, 15, 591 ("(cthread_call_executor) pool %p cthread %p executing call_rep %p\n", 592 p, cthread, call_rep)); 593 594 /* 595 * Unlock the cthread_mutex while the call is executing. 596 */ 597 RPC_MUTEX_UNLOCK (cthread_mutex); 598 599 /* 600 * Invoke the routine provided when this thread was invoked 601 * with the argument provided. The routine is always called 602 * with general cancelability disabled AND WITH THE CALL LOCKED. 603 * Since we don't have reference counts (in the common code) 604 * this call reference and lock is 'handed off' to the routine 605 * (which is responsible for releasing the lock). Upon completion 606 * of the 'routine' we can no longer reference the call (it may 607 * no longer exist). 608 */ 609 610 RPC_CALL_LOCK(cthread->call_rep); 611 (*(pvt->executor)) (pvt->optargs, run_queued_call); 612 613 /* 614 * Reacquire the cthread mutex and check for queued calls. 615 * As the above somment sez; we no longer hold the call lock 616 * at this point. 617 */ 618 619 RPC_MUTEX_LOCK (cthread_mutex); 620 } 621 /* 622 * Select the oldest queued call; remove it from its queue(s) 623 * and setup to execute it. 624 */ 625 626 skip_startup = false; 627 if (CTHREAD_POOL_IS_QUEUE_EMPTY(p)) 628 { 629 run_queued_call = false; 630 continue; 631 } 632 call_rep = cthread_pool_dequeue_first(p); 633 pvt = &call_rep->u.server.cthread; 634 635 /* 636 * Fill in the thread_h of the protocol specific call 637 * handle for use by the protocol module. 638 */ 639 pvt->thread_h = cthread->thread_id; 640 641 /* 642 * Update the cthread table entry for this call just to be 643 * consistent. 644 */ 645 cthread->call_rep = call_rep; 646 647 /* 648 * Indicate there's a queued call to process. 649 */ 650 run_queued_call = true; 651 } while (run_queued_call); 652 653 /* 654 * Free up this thread to be allocated again. 655 */ 656 cthread->thread_state = RPC_C_IDLE_CTHREAD; 657 } 658 659 RPC_DBG_PRINTF (rpc_e_dbg_general, 5, 660 ("(cthread_call_executor) pool %p cthread %p stopped\n", 661 p, cthread)); 662 663 /* 664 * Notify others that the cthread is exiting. 665 */ 666 667 cthread->thread_state = RPC_C_NO_CTHREAD; 668 RPC_COND_BROADCAST (cthread->thread_cond, cthread_mutex); 669 670 RPC_MUTEX_UNLOCK (cthread_mutex); 671} 672 673 674/* 675**++ 676** 677** ROUTINE NAME: cthread_reaper 678** 679** SCOPE: INTERNAL 680** 681** DESCRIPTION: 682** 683** Free pools as they become idle 684** (this is run periodically from the timer thread). 685** 686** INPUTS: none 687** 688** INPUTS/OUTPUTS: none 689** 690** OUTPUTS: none 691** 692** IMPLICIT INPUTS: 693** cthread_reaper_queue the queue of waiting to be freed pools 694** 695** IMPLICIT OUTPUTS: none 696** 697** FUNCTION VALUE: none 698** 699** SIDE EFFECTS: none 700** 701**-- 702**/ 703 704INTERNAL void cthread_reaper 705( 706 dce_pointer_t unused_arg ATTRIBUTE_UNUSED 707) 708{ 709 cthread_pool_elt_p_t p, np; 710 unsigned32 i; 711 unsigned32 st; 712 cthread_elt_p_t cthread; 713 boolean free_pool; 714 715 RPC_MUTEX_LOCK (cthread_mutex); 716 717 /* 718 * Scan the queue looking for (and freeing) idle pools. 719 */ 720 RPC_LIST_FIRST(cthread_reaper_queue, p, cthread_pool_elt_p_t); 721 while (p != NULL) 722 { 723 free_pool = true; 724 if (p->ctbl != NULL) 725 { 726 /* 727 * See if all of the pool's threads have completed. 728 */ 729 for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++) 730 { 731 if (cthread->thread_state != RPC_C_NO_CTHREAD) 732 { 733 free_pool = false; 734 break; 735 } 736 } 737 } 738 739 if (! free_pool) 740 { 741 RPC_LIST_NEXT (p, p, cthread_pool_elt_p_t); 742 continue; 743 } 744 745 RPC_DBG_PRINTF (rpc_e_dbg_general, 5, 746 ("(cthread_reaper) freeing pool %p\n", p)); 747 748 /* 749 * Remove the pool from the reaper's queue (pool free really 750 * frees the storage)... but first, determine the "next pool" 751 * so we can continue the scan. 752 */ 753 754 RPC_LIST_NEXT (p, np, cthread_pool_elt_p_t); 755 RPC_LIST_REMOVE (cthread_reaper_queue, p); 756 757 /* 758 * Free up the pool's descriptor. 759 */ 760 cthread_pool_free(p, &st); 761 762 /* 763 * Continue scanning with the next on the list. 764 */ 765 p = np; 766 } 767 768 /* 769 * Shutdown the reaper timer when there's nothing to reap. 770 */ 771 if (RPC_LIST_EMPTY(cthread_reaper_queue)) 772 rpc__timer_clear(&cthread_reaper_timer); 773 774 RPC_MUTEX_UNLOCK (cthread_mutex); 775} 776 777/* 778**++ 779** 780** ROUTINE NAME: cthread_pool_alloc 781** 782** SCOPE: INTERNAL 783** 784** DESCRIPTION: 785** 786** Allocate the resources for a pool (cthread_pool_start() actually creates the 787** threads). 788** 789** INPUTS: 790** 791** n_threads number of call threads in the pool 792** 793** INPUTS/OUTPUTS: none 794** 795** OUTPUTS: 796** 797** status A value indicating the status of the routine. 798** 799** rpc_s_ok 800** 801** IMPLICIT INPUTS: none 802** 803** IMPLICIT OUTPUTS: none 804** 805** FUNCTION VALUE: 806** 807** p the created pool 808** 809** SIDE EFFECTS: none 810** 811**-- 812**/ 813 814INTERNAL cthread_pool_elt_p_t cthread_pool_alloc 815( 816 unsigned32 n_threads, 817 boolean32 is_default_pool, 818 unsigned32 *status 819) 820{ 821 cthread_pool_elt_p_t p = NULL; 822 823 CODING_ERROR (status); 824 825 RPC_MUTEX_LOCK_ASSERT (cthread_mutex); 826 827 /* 828 * Check for the sanity of the number of threads. 829 */ 830 if (n_threads == 0) 831 { 832 *status = rpc_s_max_calls_too_small; 833 return p; 834 } 835 836 /* 837 * Alloc a pool descriptor. 838 */ 839 RPC_MEM_ALLOC (p, 840 cthread_pool_elt_p_t, 841 sizeof (cthread_pool_elt_t), 842 RPC_C_MEM_CTHREAD_POOL, 843 RPC_C_MEM_WAITOK); 844 845 if (p == NULL) 846 { 847 *status = rpc_s_no_memory; 848 return p; 849 } 850 851 /* 852 * Init the fields in the pool descriptor. 853 */ 854 RPC_LIST_INIT (p->link); 855 p->n_threads = n_threads; 856 p->n_idle = 0; 857 p->ctbl = NULL; 858 p->idle_cthread = NULL; 859 p->n_queued = 0; 860 861 /* 862 * If the application has indicated a preference for the call queue depth 863 * of the default pool, use that. Otherwise, default to 8 times the number 864 * of threads in the pool. 865 */ 866 if (is_default_pool && cthread_default_call_queue_size != 0) 867 p->max_queued = cthread_default_call_queue_size; 868 else 869 p->max_queued = RPC_C_CTHREAD_QUEUE_MULTIPLIER * n_threads; 870 871 RPC_LIST_INIT (p->call_queue); 872 RPC_LIST_INIT (p->free_queue); 873 p->stop = false; 874 p->queue_elt_alloc = ! is_default_pool; 875 876 *status = rpc_s_ok; 877 878/* 879CLEANUP: 880*/ 881 if (*status != rpc_s_ok) 882 { 883 if (p != NULL) 884 RPC_MEM_FREE (p, RPC_C_MEM_CTHREAD_POOL); 885 p = NULL; 886 } 887 888 return p; 889} 890 891/* 892**++ 893** 894** ROUTINE NAME: cthread_pool_set_threadcnt 895** 896** SCOPE: INTERNAL 897** 898** DESCRIPTION: 899** 900** Modify the number of threads associated with the pool 901** This is not intended to generically work; this is only 902** suppose to work on "idle" pools (alloc'ed but not started, 903** or started and then stopped). 904** 905** INPUTS: 906** 907** p the pool who's count to modify 908** n_threads the new number of threads 909** 910** INPUTS/OUTPUTS: none 911** 912** OUTPUTS: 913** 914** status A value indicating the status of the routine. 915** 916** rpc_s_ok 917** 918** IMPLICIT INPUTS: none 919** 920** IMPLICIT OUTPUTS: none 921** 922** FUNCTION VALUE: none 923** 924** SIDE EFFECTS: none 925** 926**-- 927**/ 928 929INTERNAL void cthread_pool_set_threadcnt 930( 931 cthread_pool_elt_p_t p, 932 unsigned32 n_threads, 933 unsigned32 *status 934) 935{ 936 CODING_ERROR (status); 937 938 RPC_MUTEX_LOCK_ASSERT (cthread_mutex); 939 940 /* 941 * Check for the sanity of the number of threads. 942 */ 943 if (n_threads == 0) 944 { 945 *status = rpc_s_max_calls_too_small; 946 return; 947 } 948 949 p->n_threads = n_threads; 950 951 /* 952 * Use a default call queue size if we're operating on a private pool, 953 * or if this is the default pool and the application hasn't previously 954 * specified a default call queue size for the default pool. 955 */ 956 if (p != cthread_default_pool || cthread_default_call_queue_size == 0) 957 p->max_queued = RPC_C_CTHREAD_QUEUE_MULTIPLIER * n_threads; 958 959 *status = rpc_s_ok; 960} 961 962/* 963**++ 964** 965** ROUTINE NAME: cthread_pool_free 966** 967** SCOPE: INTERNAL 968** 969** DESCRIPTION: 970** 971** Free the (assumed idle) pool's resources. 972** 973** INPUTS: 974** 975** p the pool to free 976** 977** INPUTS/OUTPUTS: none 978** 979** OUTPUTS: 980** 981** status A value indicating the status of the routine. 982** 983** rpc_s_ok 984** 985** IMPLICIT INPUTS: none 986** 987** IMPLICIT OUTPUTS: none 988** 989** FUNCTION VALUE: none 990** 991** SIDE EFFECTS: none 992** 993**-- 994**/ 995 996INTERNAL void cthread_pool_free 997( 998 cthread_pool_elt_p_t p, 999 unsigned32 *status 1000) 1001{ 1002 unsigned32 i; 1003 cthread_elt_p_t cthread; 1004 1005 CODING_ERROR (status); 1006 1007 RPC_MUTEX_LOCK_ASSERT (cthread_mutex); 1008 1009 /* 1010 * The assumption is that the pool is idle (all of its threads 1011 * have terminated). 1012 */ 1013 1014 /* 1015 * Clean up and free the ctbl. If there is a ctbl, the assumption 1016 * is that all of the ctable's entries have valid initialized cv's. 1017 */ 1018 if (p->ctbl) 1019 { 1020 for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++) 1021 { 1022 RPC_COND_DELETE (cthread->thread_cond, cthread_mutex); 1023 } 1024 RPC_MEM_FREE (p->ctbl, RPC_C_MEM_CTHREAD_CTBL); 1025 p->ctbl = NULL; 1026 } 1027 1028 /* 1029 * Free up the queue elt table. 1030 */ 1031 while (! RPC_LIST_EMPTY(p->free_queue)) 1032 { 1033 cthread_queue_elt_p_t qe; 1034 1035 RPC_LIST_REMOVE_TAIL(p->free_queue, qe, cthread_queue_elt_p_t); 1036 RPC_MEM_FREE (qe, RPC_C_MEM_CTHREAD_QETBL); 1037 } 1038 1039 /* 1040 * Free the pool descriptor. 1041 */ 1042 RPC_MEM_FREE (p, RPC_C_MEM_CTHREAD_POOL); 1043 1044 *status = rpc_s_ok; 1045} 1046 1047/* 1048**++ 1049** 1050** ROUTINE NAME: cthread_pool_start 1051** 1052** SCOPE: INTERNAL 1053** 1054** DESCRIPTION: 1055** 1056** Start up the call execution threads for an existing pool. 1057** 1058** INPUTS: 1059** 1060** p the pool to start 1061** n_threads number of call threads in the pool 1062** 1063** INPUTS/OUTPUTS: none 1064** 1065** OUTPUTS: 1066** 1067** status A value indicating the status of the routine. 1068** 1069** rpc_s_ok 1070** 1071** IMPLICIT INPUTS: none 1072** 1073** IMPLICIT OUTPUTS: none 1074** 1075** FUNCTION VALUE: none 1076** 1077** SIDE EFFECTS: none 1078** 1079**-- 1080**/ 1081 1082INTERNAL void cthread_pool_start 1083( 1084 cthread_pool_elt_p_t p, 1085 unsigned32 *status 1086) 1087{ 1088 unsigned32 i; 1089 cthread_elt_p_t cthread; 1090 1091 CODING_ERROR (status); 1092 1093 RPC_MUTEX_LOCK_ASSERT (cthread_mutex); 1094 1095 /* 1096 * The pool should not currently have any actual call threads. 1097 */ 1098 if (p->ctbl != NULL) 1099 { 1100 RPC_DBG_GPRINTF ( 1101 ("(cthread_pool_start) pool %p orphaning ctbl\n", p)); 1102 } 1103 1104 /* 1105 * Allocate the pool's call thread table. 1106 */ 1107 RPC_MEM_ALLOC (p->ctbl, 1108 cthread_elt_p_t, 1109 p->n_threads * (sizeof (cthread_elt_t)), 1110 RPC_C_MEM_CTHREAD_CTBL, 1111 RPC_C_MEM_WAITOK); 1112 1113 if (p->ctbl == NULL) 1114 { 1115 *status = rpc_s_no_memory; 1116 return; 1117 } 1118 1119 /* 1120 * Init the pool's cthread table / create the cthreads. 1121 * Do this in two phases to ensure that the table is 1122 * sane in the event that thread creation fails and cleanup 1123 * is necessary. 1124 */ 1125 1126 for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++) 1127 { 1128 cthread->pool = p; 1129 cthread->thread_state = RPC_C_NO_CTHREAD; 1130 RPC_COND_INIT (cthread->thread_cond, cthread_mutex); 1131 } 1132 1133 for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++) 1134 { 1135 cthread_create(cthread, status); 1136 if (*status != rpc_s_ok) 1137 { 1138 RPC_DBG_GPRINTF ( 1139 ("(cthread_pool_start) pool %p couldn't create thread %d\n", p, i)); 1140 goto CLEANUP; 1141 } 1142 } 1143 1144 /* 1145 * Setup additional fields in the pool descriptor. 1146 */ 1147 p->n_idle = 0; 1148 p->idle_cthread = NULL; 1149 p->n_queued = 0; 1150 RPC_LIST_INIT (p->call_queue); 1151 RPC_LIST_INIT (p->free_queue); 1152 1153 /* 1154 * Allocate the pool's queue elements if necessary. 1155 */ 1156 if (p->queue_elt_alloc) 1157 { 1158 for (i = 0; i < p->max_queued; i++) 1159 { 1160 cthread_queue_elt_p_t qe; 1161 1162 RPC_MEM_ALLOC (qe, 1163 cthread_queue_elt_p_t, 1164 sizeof (cthread_queue_elt_t), 1165 RPC_C_MEM_CTHREAD_QETBL, 1166 RPC_C_MEM_WAITOK); 1167 1168 if (qe == NULL) 1169 { 1170 *status = rpc_s_no_memory; 1171 goto CLEANUP; 1172 } 1173 1174 qe->pool = p; 1175 RPC_LIST_ADD_TAIL (p->free_queue, qe, cthread_queue_elt_p_t); 1176 } 1177 } 1178 1179 RPC_DBG_PRINTF (rpc_e_dbg_general, 5, 1180 ("(cthread_pool_start) pool %p (%d threads)\n", p, p->n_threads)); 1181 1182 /* 1183 * Tell the pool's threads to start. 1184 */ 1185 p->stop = false; 1186 1187 *status = rpc_s_ok; 1188 1189CLEANUP: 1190 1191 if (*status != rpc_s_ok) 1192 { 1193 unsigned32 st; 1194 1195 if (p->ctbl != NULL) 1196 { 1197 cthread_pool_stop(p, true /* wait */, &st); 1198 p->ctbl = NULL; 1199 } 1200 1201 while (! RPC_LIST_EMPTY(p->free_queue)) 1202 { 1203 cthread_queue_elt_p_t qe; 1204 1205 RPC_LIST_REMOVE_TAIL(p->free_queue, qe, cthread_queue_elt_p_t); 1206 RPC_MEM_FREE (qe, RPC_C_MEM_CTHREAD_QETBL); 1207 } 1208 } 1209} 1210 1211/* 1212**++ 1213** 1214** ROUTINE NAME: cthread_pool_stop 1215** 1216** SCOPE: INTERNAL 1217** 1218** DESCRIPTION: 1219** 1220** Stop the pool's call threadas. 1221** 1222** INPUTS: 1223** 1224** p the pool to stop 1225** wait_flag T => wait for threads to stop 1226** 1227** INPUTS/OUTPUTS: none 1228** 1229** OUTPUTS: 1230** 1231** status A value indicating the status of the routine. 1232** 1233** rpc_s_ok 1234** 1235** IMPLICIT INPUTS: none 1236** 1237** IMPLICIT OUTPUTS: none 1238** 1239** FUNCTION VALUE: none 1240** 1241** SIDE EFFECTS: none 1242** 1243**-- 1244**/ 1245 1246INTERNAL void cthread_pool_stop 1247( 1248 cthread_pool_elt_p_t p, 1249 unsigned32 wait_flag, 1250 unsigned32 *status 1251) 1252{ 1253 int cs; 1254 unsigned32 i; 1255 cthread_elt_p_t cthread; 1256 1257 CODING_ERROR (status); 1258 1259 RPC_MUTEX_LOCK_ASSERT (cthread_mutex); 1260 1261 /* 1262 * If there are no threads associated with the pool, we're done. 1263 */ 1264 if (p->ctbl == NULL) 1265 { 1266 *status = rpc_s_ok; 1267 return; 1268 } 1269 1270 RPC_DBG_PRINTF (rpc_e_dbg_general, 5, 1271 ("(cthread_pool_stop) pool %p (%d threads) stopping\n", 1272 p, p->n_threads)); 1273 1274 /* 1275 * Tell the threads to stop when they complete the current activities. 1276 */ 1277 p->stop = true; 1278 1279 /* 1280 * Unblock any waiting call threads so they detect the 'stop' condition. 1281 */ 1282 for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++) 1283 { 1284 if (cthread->thread_state != RPC_C_NO_CTHREAD) 1285 { 1286 RPC_COND_SIGNAL (cthread->thread_cond, cthread_mutex); 1287 } 1288 } 1289 1290 /* 1291 * If not waiting, we're done. 1292 */ 1293 if (!wait_flag) 1294 { 1295 *status = rpc_s_ok; 1296 return; 1297 } 1298 1299 /* 1300 * Disable cancel delivery while awaiting cthread termination. This 1301 * ensures completion and preservation of invariants. If it becomes 1302 * necessary, we can allow cancels and setup a cleanup handler and 1303 * in the event of a cancel, queue the pool to the reaper for final 1304 * cleanup. 1305 */ 1306 cs = dcethread_enableinterrupt_throw (0); 1307 1308 /* 1309 * Wait for all call threads to complete. 1310 * 1311 * We wait on the call thread's private cv; the cthread signals its 1312 * cv prior to exiting. While dcethread_join() would have done the 1313 * trick; this scheme works just as well and is portable to environments 1314 * that may have difficulty implementing join (i.e. for Kernel RPC). 1315 */ 1316 for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++) 1317 { 1318 while (cthread->thread_state != RPC_C_NO_CTHREAD) 1319 { 1320 RPC_COND_WAIT (cthread->thread_cond, cthread_mutex); 1321 } 1322 } 1323 1324 /* 1325 * Restore the cancel state. 1326 */ 1327 dcethread_enableinterrupt_throw (cs); 1328 1329 RPC_DBG_PRINTF (rpc_e_dbg_general, 5, 1330 ("(cthread_pool_stop) pool %p (%d threads) stopped\n", 1331 p, p->n_threads)); 1332 1333 /* 1334 * Clean up and free the ctbl. If there is a ctbl, the assumption 1335 * is that all of the ctable's entries have valid initialized cv's. 1336 */ 1337 for (i = 0, cthread = &p->ctbl[0]; i < p->n_threads; i++, cthread++) 1338 { 1339 RPC_COND_DELETE (cthread->thread_cond, cthread_mutex); 1340 } 1341 RPC_MEM_FREE (p->ctbl, RPC_C_MEM_CTHREAD_CTBL); 1342 p->ctbl = NULL; 1343 1344 /* 1345 * Free up the queue elt list. 1346 */ 1347 while (! RPC_LIST_EMPTY(p->free_queue)) 1348 { 1349 cthread_queue_elt_p_t qe; 1350 1351 RPC_LIST_REMOVE_TAIL(p->free_queue, qe, cthread_queue_elt_p_t); 1352 RPC_MEM_FREE (qe, RPC_C_MEM_CTHREAD_QETBL); 1353 } 1354 1355 *status = rpc_s_ok; 1356} 1357 1358/* 1359**++ 1360** 1361** ROUTINE NAME: cthread_pool_assign_thread 1362** 1363** SCOPE: INTERNAL 1364** 1365** DESCRIPTION: 1366** 1367** Locate an idle thread in the indicated pool. 1368** 1369** INPUTS: 1370** 1371** p the pool to search 1372** 1373** INPUTS/OUTPUTS: none 1374** 1375** OUTPUTS: 1376** 1377** status A value indicating the status of the routine. 1378** 1379** rpc_s_ok 1380** 1381** IMPLICIT INPUTS: none 1382** 1383** IMPLICIT OUTPUTS: none 1384** 1385** FUNCTION VALUE: 1386** 1387** cthread the assigned thread (NULL if none found) 1388** 1389** SIDE EFFECTS: none 1390** 1391**-- 1392**/ 1393 1394INTERNAL cthread_elt_p_t cthread_pool_assign_thread 1395( 1396 cthread_pool_elt_p_t p 1397) 1398{ 1399 cthread_elt_p_t cthread = NULL; 1400 1401 RPC_MUTEX_LOCK_ASSERT (cthread_mutex); 1402 1403 /* 1404 * Locate an idle call thread (if one exists). 1405 */ 1406 if (p->n_idle > 0) 1407 { 1408 if (p->idle_cthread != NULL) 1409 { 1410 cthread = p->idle_cthread; 1411 assert(cthread->thread_state == RPC_C_IDLE_CTHREAD); 1412 p->idle_cthread = NULL; 1413 } 1414 else 1415 { 1416 cthread_elt_p_t ct; 1417 1418 for (ct = p->ctbl; ct < &p->ctbl[p->n_threads]; ct++) 1419 { 1420 if (ct->thread_state == RPC_C_IDLE_CTHREAD) 1421 { 1422 cthread = ct; 1423 break; 1424 } 1425 } 1426 } 1427 } 1428 1429 if (cthread != NULL) 1430 { 1431 cthread->thread_state = RPC_C_ACTIVE_CTHREAD; 1432 p->n_idle--; 1433 } 1434 1435 return cthread; 1436} 1437 1438/* 1439**++ 1440** 1441** ROUTINE NAME: cthread_pool_queue_call 1442** 1443** SCOPE: INTERNAL 1444** 1445** DESCRIPTION: 1446** 1447** Attempt to queue a call for deferred execution. 1448** 1449** INPUTS: 1450** 1451** p the call's pool 1452** call_rep the call 1453** 1454** INPUTS/OUTPUTS: none 1455** 1456** OUTPUTS: 1457** 1458** status A value indicating the status of the routine. 1459** 1460** rpc_s_ok 1461** rpc_s_cthread_not_found 1462** 1463** IMPLICIT INPUTS: none 1464** 1465** IMPLICIT OUTPUTS: none 1466** 1467** FUNCTION VALUE: none 1468** 1469** SIDE EFFECTS: none 1470** 1471**-- 1472**/ 1473 1474INTERNAL void cthread_pool_queue_call 1475( 1476 cthread_pool_elt_p_t p, 1477 rpc_call_rep_p_t call_rep, 1478 unsigned32 *status 1479) 1480{ 1481 rpc_cthread_pvt_info_p_t pvt = &call_rep->u.server.cthread; 1482 boolean is_default_pool = (p == cthread_default_pool); 1483 1484 CODING_ERROR (status); 1485 1486 RPC_MUTEX_LOCK_ASSERT (cthread_mutex); 1487 1488 /* 1489 * If the queue is full, we're done. 1490 */ 1491 if (CTHREAD_POOL_IS_QUEUE_FULL (p)) 1492 { 1493 RPC_DBG_GPRINTF (( 1494 "(cthread_pool_queue_call) pool %p full call_rep %p\n", p, call_rep)); 1495 *status = rpc_s_cthread_not_found; 1496 return; 1497 } 1498 1499 /* 1500 * Indicate that the call is queued. 1501 */ 1502 pvt->is_queued = true; 1503 1504 /* 1505 * Always add the call to the default pool's queue. 1506 * 1507 * ONLY Update the default pool's n_queued if the call is for the 1508 * default pool (see the cthread_pool_elt description comments above)! 1509 */ 1510 RPC_LIST_ADD_TAIL (cthread_default_pool->call_queue, 1511 call_rep, rpc_call_rep_p_t); 1512 if (is_default_pool) 1513 { 1514 pvt->qelt = NULL; 1515 p->n_queued++; 1516 } 1517 1518 /* 1519 * If it's a reserved pool, add it to its queue too. 1520 */ 1521 if (! is_default_pool) 1522 { 1523 cthread_queue_elt_p_t qelt; 1524 1525 RPC_LIST_REMOVE_HEAD(p->free_queue, qelt, cthread_queue_elt_p_t); 1526 assert (qelt != NULL); 1527 1528 qelt->call_rep = call_rep; 1529 pvt->qelt = (dce_pointer_t)qelt; 1530 1531 RPC_LIST_ADD_TAIL (p->call_queue, qelt, cthread_queue_elt_p_t); 1532 p->n_queued++; 1533 } 1534 1535 RPC_DBG_PRINTF (rpc_e_dbg_general, 5, 1536 ("(cthread_pool_queue_call) pool %p (now %d) call_rep %p\n", 1537 p, p->n_queued, call_rep)); 1538 1539 *status = rpc_s_ok; 1540} 1541 1542/* 1543**++ 1544** 1545** ROUTINE NAME: cthread_pool_dequeue_first 1546** 1547** SCOPE: INTERNAL 1548** 1549** DESCRIPTION: Remove the first queued call rep from a pool. 1550** 1551** INPUTS: none 1552** 1553** INPUTS/OUTPUTS: 1554** 1555** p The pool of interest 1556** 1557** OUTPUTS: none 1558** 1559** IMPLICIT INPUTS: none 1560** 1561** IMPLICIT OUTPUTS: none 1562** 1563** FUNCTION VALUE: 1564** 1565** call_rep The dequeued call rep (may be NULL). 1566** 1567** SIDE EFFECTS: none 1568** 1569**-- 1570**/ 1571 1572INTERNAL rpc_call_rep_p_t cthread_pool_dequeue_first 1573( 1574 cthread_pool_elt_p_t p 1575) 1576{ 1577 rpc_call_rep_p_t call_rep; 1578 boolean is_default_pool = (p == cthread_default_pool); 1579 1580 RPC_MUTEX_LOCK_ASSERT (cthread_mutex); 1581 1582 /* 1583 * If the queue is empty we're done. 1584 */ 1585 if (CTHREAD_POOL_IS_QUEUE_EMPTY(p)) 1586 { 1587 return NULL; 1588 } 1589 1590 /* 1591 * Determine the call rep of interest and then dequeue it. 1592 */ 1593 if (is_default_pool) 1594 { 1595 /* 1596 * The default pool's queue is the queue of call reps. 1597 */ 1598 RPC_LIST_FIRST (p->call_queue, 1599 call_rep, 1600 rpc_call_rep_p_t); 1601 } 1602 else 1603 { 1604 cthread_queue_elt_p_t qelt; 1605 1606 /* 1607 * The call was really for a reserved pool; determine the 1608 * call rep via the indirection queue elt. 1609 */ 1610 RPC_LIST_FIRST (p->call_queue, 1611 qelt, 1612 cthread_queue_elt_p_t); 1613 1614 call_rep = qelt->call_rep; 1615 assert ((cthread_queue_elt_p_t)call_rep->u.server.cthread.qelt == qelt); 1616 } 1617 1618 (void) cthread_call_dequeue (call_rep); 1619 1620 return call_rep; 1621} 1622 1623/* 1624**++ 1625** 1626** ROUTINE NAME: cthread_call_dequeue 1627** 1628** SCOPE: INTERNAL 1629** 1630** DESCRIPTION: Remove a call rep from the call executor 1631** thread waiting queue, if it's there. 1632** 1633** INPUTS: none 1634** 1635** INPUTS/OUTPUTS: 1636** 1637** call_rep The call rep to be dequeued. 1638** 1639** OUTPUTS: none 1640** 1641** IMPLICIT INPUTS: none 1642** 1643** IMPLICIT OUTPUTS: none 1644** 1645** FUNCTION VALUE: boolean 1646** 1647** T => call was previously queued. 1648** 1649** SIDE EFFECTS: none 1650** 1651**-- 1652**/ 1653 1654INTERNAL boolean32 cthread_call_dequeue 1655( 1656 rpc_call_rep_p_t call_rep 1657) 1658{ 1659 rpc_cthread_pvt_info_p_t pvt = &call_rep->u.server.cthread; 1660 cthread_queue_elt_p_t qelt = (cthread_queue_elt_p_t)pvt->qelt; 1661 cthread_pool_elt_p_t p; 1662 1663 RPC_MUTEX_LOCK_ASSERT (cthread_mutex); 1664 1665 /* 1666 * If call's not queued, were done. 1667 */ 1668 if (! pvt->is_queued) 1669 { 1670 return false; 1671 } 1672 1673 /* 1674 * Dequeue the call from the default pool. 1675 */ 1676 RPC_LIST_REMOVE (cthread_default_pool->call_queue, call_rep); 1677 1678 /* 1679 * The call may or may not been for the default pool. 1680 */ 1681 if (qelt == NULL) 1682 { 1683 /* 1684 * The call was for the default pool; adjust the 1685 * default pool queue count (see the cthread_pool_elt 1686 * description). 1687 */ 1688 p = cthread_default_pool; 1689 cthread_default_pool->n_queued--; 1690 } 1691 else 1692 { 1693 /* 1694 * The call was really for a reserved pool; 1695 * remove it from that queue too. 1696 */ 1697 p = qelt->pool; 1698 1699 assert (qelt->call_rep == call_rep); 1700 assert ((cthread_queue_elt_p_t)pvt->qelt == qelt); 1701 1702 RPC_LIST_REMOVE (p->call_queue, qelt); 1703 p->n_queued--; 1704 1705 /* 1706 * return the queue elt to its free list. 1707 */ 1708 qelt->call_rep = NULL; 1709 RPC_LIST_ADD_HEAD (p->free_queue, 1710 qelt, 1711 cthread_queue_elt_p_t); 1712 } 1713 1714 /* 1715 * The call is no longer queued. 1716 */ 1717 pvt->is_queued = false; 1718 pvt->qelt = NULL; 1719 1720 RPC_DBG_PRINTF (rpc_e_dbg_general, 5, 1721 ("(cthread_call_dequeue) pool %p (%d remain) call_rep %p\n", 1722 p, p->n_queued, call_rep)); 1723 1724 return true; 1725} 1726 1727/* 1728**++ 1729** 1730** ROUTINE NAME: rpc__cthread_init 1731** 1732** SCOPE: PRIVATE - declared in comcthd.h 1733** 1734** DESCRIPTION: 1735** 1736** Initialize the cthread package. 1737** 1738** INPUTS: none 1739** 1740** INPUTS/OUTPUTS: none 1741** 1742** OUTPUTS: 1743** 1744** status A value indicating the status of the routine. 1745** 1746** rpc_s_ok 1747** 1748** IMPLICIT INPUTS: none 1749** 1750** IMPLICIT OUTPUTS: none 1751** 1752** FUNCTION VALUE: none 1753** 1754** SIDE EFFECTS: none 1755** 1756**-- 1757**/ 1758 1759PRIVATE void rpc__cthread_init 1760( 1761 unsigned32 *status 1762) 1763{ 1764 CODING_ERROR (status); 1765 1766 RPC_MUTEX_INIT (cthread_mutex); 1767 1768 *status = rpc_s_ok; 1769} 1770 1771/* 1772**++ 1773** 1774** ROUTINE NAME: rpc_server_create_thread_pool 1775** 1776** SCOPE: PUBLIC - declared in rpcpvt.idl 1777** 1778** DESCRIPTION: 1779** 1780** Allocate the resources for a pool (rpc__cthread_pool_start() actually 1781** creates the threads). 1782** 1783** INPUTS: 1784** 1785** n_threads number of call threads in the pool 1786** 1787** INPUTS/OUTPUTS: none 1788** 1789** OUTPUTS: 1790** 1791** phandle Handle to the new pool. 1792** status A value indicating the status of the routine. 1793** 1794** rpc_s_ok 1795** 1796** IMPLICIT INPUTS: none 1797** 1798** IMPLICIT OUTPUTS: none 1799** 1800** FUNCTION VALUE: none 1801** 1802** SIDE EFFECTS: none 1803** 1804**-- 1805**/ 1806 1807PUBLIC void rpc_server_create_thread_pool 1808( 1809 unsigned32 n_threads, 1810 rpc_thread_pool_handle_t *phandle, 1811 unsigned32 *status 1812) 1813{ 1814 cthread_pool_elt_p_t p; 1815 1816 CODING_ERROR (status); 1817 RPC_VERIFY_INIT (); 1818 1819 *phandle = NULL; 1820 1821 RPC_MUTEX_LOCK (cthread_mutex); 1822 1823 p = cthread_pool_alloc(n_threads, false /* is_default_pool */, status); 1824 if (*status != rpc_s_ok) 1825 goto CLEANUP; 1826 1827 /* 1828 * Make the newly created pool "public". 1829 */ 1830 RPC_LIST_ADD_TAIL (cthread_reserved_pools, p, cthread_pool_elt_p_t); 1831 *phandle = (rpc_thread_pool_handle_t) p; 1832 1833 /* 1834 * Normally, reserved pools are started up when the default pool 1835 * gets started, as a consequence of calling rpc_server_listen. 1836 * However, if the default pool has already been started up, then 1837 * start up this reserved pool immediately so that it will be available 1838 * for handling calls. 1839 */ 1840 if (cthread_invoke_enabled) 1841 cthread_pool_start (p, status); 1842 1843CLEANUP: 1844 1845 RPC_MUTEX_UNLOCK (cthread_mutex); 1846} 1847 1848/* 1849**++ 1850** 1851** ROUTINE NAME: rpc_server_free_thread_pool 1852** 1853** SCOPE: PUBLIC - declared in rpcpvt.idl 1854** 1855** DESCRIPTION: 1856** 1857** Stop the pool's call threads and free the pool resources. 1858** 1859** INPUTS: 1860** 1861** phandle Pool to free 1862** wait_flag T => wait for threads to stop 1863** 1864** INPUTS/OUTPUTS: none 1865** 1866** OUTPUTS: 1867** 1868** status A value indicating the status of the routine. 1869** 1870** rpc_s_ok 1871** 1872** IMPLICIT INPUTS: none 1873** 1874** IMPLICIT OUTPUTS: none 1875** 1876** FUNCTION VALUE: none 1877** 1878** SIDE EFFECTS: none 1879** 1880**-- 1881**/ 1882 1883PUBLIC void rpc_server_free_thread_pool 1884( 1885 rpc_thread_pool_handle_t *phandle, 1886 boolean32 wait_flag, 1887 unsigned32 *status 1888) 1889{ 1890 cthread_pool_elt_p_t p = (cthread_pool_elt_p_t) *phandle; 1891 1892 CODING_ERROR (status); 1893 RPC_VERIFY_INIT (); 1894 1895 RPC_MUTEX_LOCK (cthread_mutex); 1896 1897 /* 1898 * Remove the pool from the set of reserved pools. 1899 * For all practical external purposes, the reserved pool 1900 * no longer exists (though its cthreads may be still executing 1901 * their current (and queued) calls. 1902 */ 1903 RPC_LIST_REMOVE (cthread_reserved_pools, p); 1904 1905 /* 1906 * Stop the pool's threads (waiting as directed). 1907 */ 1908 cthread_pool_stop(p, wait_flag, status); 1909 1910 /* 1911 * If we waited for the pool to become idle we can immediately free it; 1912 * otherwise we've got to queue it for eventual freeing (and start up the 1913 * reaper timer if this is the first item being queued). 1914 */ 1915 if (wait_flag || p->ctbl == NULL) 1916 { 1917 unsigned32 st; 1918 cthread_pool_free(p, &st); 1919 } 1920 else 1921 { 1922 if (RPC_LIST_EMPTY(cthread_reaper_queue)) 1923 { 1924 rpc__timer_set(&cthread_reaper_timer, 1925 cthread_reaper, NULL, RPC_C_CTHREAD_REAPER_FREQ); 1926 } 1927 RPC_LIST_ADD_TAIL (cthread_reaper_queue, p, cthread_pool_elt_p_t); 1928 } 1929 1930 *phandle = NULL; 1931 1932 /* 1933CLEANUP: 1934*/ 1935 RPC_MUTEX_UNLOCK (cthread_mutex); 1936} 1937 1938/* 1939**++ 1940** 1941** ROUTINE NAME: rpc_server_set_thread_pool_fn 1942** 1943** SCOPE: PUBLIC - declared in rpcpvt.idl 1944** 1945** DESCRIPTION: 1946** 1947** [Un]Register a thread pool lookup function with the runtime. 1948** 1949** INPUTS: 1950** 1951** pool_fn the lookup function - may be NULL 1952** 1953** INPUTS/OUTPUTS: none 1954** 1955** OUTPUTS: 1956** 1957** status A value indicating the status of the routine. 1958** 1959** rpc_s_ok 1960** 1961** IMPLICIT INPUTS: none 1962** 1963** IMPLICIT OUTPUTS: none 1964** 1965** FUNCTION VALUE: none 1966** 1967** SIDE EFFECTS: none 1968** 1969**-- 1970**/ 1971 1972PUBLIC void rpc_server_set_thread_pool_fn 1973( 1974 rpc_thread_pool_fn_t pool_fn, 1975 unsigned32 *status 1976) 1977{ 1978 CODING_ERROR (status); 1979 RPC_VERIFY_INIT (); 1980 1981 RPC_MUTEX_LOCK (cthread_mutex); 1982 1983 if (pool_fn != NULL && cthread_pool_lookup_fn != NULL) 1984 { 1985 *status = -1; /* !!! already set */ 1986 goto CLEANUP; 1987 } 1988 1989 cthread_pool_lookup_fn = pool_fn; /* be it NULL or otherwise */ 1990 *status = rpc_s_ok; 1991 1992CLEANUP: 1993 1994 RPC_MUTEX_UNLOCK (cthread_mutex); 1995} 1996 1997/* 1998**++ 1999** 2000** ROUTINE NAME: rpc_server_set_thread_qlen 2001** 2002** SCOPE: PUBLIC - declared in rpcpvt.idl 2003** 2004** DESCRIPTION: 2005** 2006** Adjust the maximum number of queued calls for a specified thread pool. 2007** 2008** INPUTS: 2009** 2010** phandle the pool whose queue size is being adjusted 2011** a NULL argument can be used to specify that the 2012** the operation should be applied to the default pool. 2013** queue_size the new size 2014** 2015** INPUTS/OUTPUTS: none 2016** 2017** OUTPUTS: 2018** 2019** status A value indicating the status of the routine. 2020** 2021** rpc_s_ok 2022** 2023** IMPLICIT INPUTS: none 2024** 2025** IMPLICIT OUTPUTS: none 2026** 2027** FUNCTION VALUE: none 2028** 2029** SIDE EFFECTS: none 2030** 2031**-- 2032**/ 2033 2034PUBLIC void rpc_server_set_thread_pool_qlen 2035( 2036 rpc_thread_pool_handle_t phandle, 2037 unsigned32 queue_size, 2038 unsigned32 * status 2039) 2040{ 2041 cthread_pool_elt_p_t pool = (cthread_pool_elt_p_t) phandle; 2042 2043 CODING_ERROR (status); 2044 RPC_VERIFY_INIT (); 2045 2046 RPC_MUTEX_LOCK (cthread_mutex); 2047 2048 *status = rpc_s_ok; 2049 2050 /* 2051 * If the caller sent NULL as the pool parameter, apply the operation 2052 * to the default pool. 2053 */ 2054 if (pool == NULL) 2055 { 2056 cthread_default_call_queue_size = queue_size; 2057 2058 /* 2059 * If the default pool hasn't been started yet, we're done; the 2060 * global value will be used when it does get started up. If the 2061 * default pool *has* been started, just update its max_queued 2062 * value. 2063 */ 2064 if (cthread_default_pool != NULL) 2065 { 2066 cthread_default_pool->max_queued = queue_size; 2067 } 2068 } 2069 else 2070 { 2071 unsigned32 i; 2072 2073 /* 2074 * We're operating on a private pool... 2075 * 2076 * If this pool has not been started yet, just record the value for 2077 * the max queue size. The actual queue element data structure will 2078 * get created when the pool is started. 2079 */ 2080 if (RPC_LIST_EMPTY(pool->free_queue)) 2081 { 2082 pool->max_queued = queue_size; 2083 } 2084 else 2085 { 2086 /* 2087 * This private pool has already been started. 2088 * 2089 * Considering that calls may currently be queued for this pool, it 2090 * would be extremely tricky, not to mention probably not useful, to 2091 * allow the caller to shrink the call queue length. Only update the 2092 * queue length if it's being increased. 2093 */ 2094 2095 if (queue_size > pool->max_queued) 2096 { 2097 /* 2098 * Alloc up some more queue elements, and add them to the list. 2099 */ 2100 for (i = pool->max_queued; i < queue_size; i++) 2101 { 2102 cthread_queue_elt_p_t qe; 2103 2104 RPC_MEM_ALLOC (qe, 2105 cthread_queue_elt_p_t, 2106 sizeof (cthread_queue_elt_t), 2107 RPC_C_MEM_CTHREAD_QETBL, 2108 RPC_C_MEM_WAITOK); 2109 2110 if (qe == NULL) 2111 { 2112 *status = rpc_s_no_memory; 2113 2114 /* 2115 * Try to stay calm... 2116 */ 2117 pool->max_queued = i; 2118 2119 RPC_MUTEX_UNLOCK (cthread_mutex); 2120 return; 2121 } 2122 2123 qe->pool = pool; 2124 RPC_LIST_ADD_TAIL (pool->free_queue, qe, cthread_queue_elt_p_t); 2125 } 2126 2127 pool->max_queued = queue_size; 2128 } 2129 } 2130 } 2131 2132 RPC_MUTEX_UNLOCK (cthread_mutex); 2133} 2134 2135/* 2136**++ 2137** 2138** ROUTINE NAME: rpc__cthread_start_all 2139** 2140** SCOPE: PRIVATE - declared in comcthd.h 2141** 2142** DESCRIPTION: 2143** 2144** Arrange for all the call execution threads to be created and 2145** enabled RPC execution. 2146** 2147** INPUTS: 2148** 2149** default_cthreads The number of default pool call threads which will be 2150** created 2151** 2152** INPUTS/OUTPUTS: none 2153** 2154** OUTPUTS: 2155** 2156** status A value indicating the status of the routine. 2157** 2158** rpc_s_ok 2159** rpc_s_no_memory 2160** 2161** IMPLICIT INPUTS: none 2162** 2163** IMPLICIT OUTPUTS: none 2164** 2165** FUNCTION VALUE: none 2166** 2167** SIDE EFFECTS: none 2168** 2169**-- 2170**/ 2171 2172PRIVATE void rpc__cthread_start_all 2173( 2174 unsigned32 default_pool_cthreads, 2175 unsigned32 *status 2176) 2177{ 2178 cthread_pool_elt_p_t p; 2179 2180 CODING_ERROR (status); 2181 2182 RPC_MUTEX_LOCK (cthread_mutex); 2183 2184 /* 2185 * Alloc the default pool if necessary (or just adjust its 2186 * thread count). 2187 */ 2188 if (cthread_default_pool == NULL) 2189 { 2190 cthread_default_pool = cthread_pool_alloc ( 2191 default_pool_cthreads, 2192 true, /* is_default_pool */ 2193 status); 2194 if (*status != rpc_s_ok) 2195 goto CLEANUP; 2196 } 2197 else 2198 { 2199 cthread_pool_set_threadcnt(cthread_default_pool, 2200 default_pool_cthreads, status); 2201 if (*status != rpc_s_ok) 2202 goto CLEANUP; 2203 } 2204 2205 /* 2206 * Fire up all of the call executor threads. 2207 */ 2208 cthread_pool_start (cthread_default_pool, status); 2209 if (*status != rpc_s_ok) 2210 goto CLEANUP; 2211 2212 RPC_LIST_FIRST (cthread_reserved_pools, p, cthread_pool_elt_p_t); 2213 while (p != NULL) 2214 { 2215 cthread_pool_start (p, status); 2216 if (*status != rpc_s_ok) 2217 goto CLEANUP; 2218 RPC_LIST_NEXT (p, p, cthread_pool_elt_p_t); 2219 } 2220 2221 /* 2222 * enable RPC queuing / execution 2223 */ 2224 cthread_invoke_enabled = true; 2225 2226 *status = rpc_s_ok; 2227 2228CLEANUP: 2229 2230 RPC_MUTEX_UNLOCK (cthread_mutex); 2231} 2232 2233/* 2234**++ 2235** 2236** ROUTINE NAME: rpc__cthread_stop_all 2237** 2238** SCOPE: PRIVATE - declared in comcthd.h 2239** 2240** DESCRIPTION: 2241** 2242** Stop all the call executor threads. Don't return until all have stopped. 2243** 2244** INPUTS: none 2245** 2246** INPUTS/OUTPUTS: none 2247** 2248** OUTPUTS: 2249** 2250** status A value indicating the status of the routine. 2251** 2252** 2253** IMPLICIT INPUTS: none 2254** 2255** IMPLICIT OUTPUTS: none 2256** 2257** FUNCTION VALUE: none 2258** 2259** SIDE EFFECTS: none 2260** 2261**-- 2262**/ 2263 2264PRIVATE void rpc__cthread_stop_all 2265( 2266 unsigned32 *status 2267) 2268{ 2269 2270 cthread_pool_elt_p_t p; 2271 2272 CODING_ERROR (status); 2273 2274 RPC_MUTEX_LOCK (cthread_mutex); 2275 2276 /* 2277 * Disable subsequent call execution processing while we're 2278 * waiting for the executors to complete. 2279 */ 2280 2281 cthread_invoke_enabled = false; 2282 2283 /* 2284 * Tell each pool to stop. 2285 */ 2286 cthread_pool_stop(cthread_default_pool, false, status); 2287 if (*status != rpc_s_ok) 2288 goto CLEANUP; 2289 2290 RPC_LIST_FIRST (cthread_reserved_pools, p, cthread_pool_elt_p_t); 2291 while (p != NULL) 2292 { 2293 cthread_pool_stop(p, false, status); 2294 if (*status != rpc_s_ok) 2295 goto CLEANUP; 2296 RPC_LIST_NEXT (p, p, cthread_pool_elt_p_t); 2297 } 2298 2299 /* 2300 * Now wait for each pool's threads to complete. 2301 */ 2302 cthread_pool_stop(cthread_default_pool, true, status); 2303 if (*status != rpc_s_ok) 2304 goto CLEANUP; 2305 2306 RPC_LIST_FIRST (cthread_reserved_pools, p, cthread_pool_elt_p_t); 2307 while (p != NULL) 2308 { 2309 cthread_pool_stop(p, true, status); 2310 if (*status != rpc_s_ok) 2311 goto CLEANUP; 2312 RPC_LIST_NEXT (p, p, cthread_pool_elt_p_t); 2313 } 2314 2315 *status = rpc_s_ok; 2316 2317CLEANUP: 2318 2319 RPC_MUTEX_UNLOCK (cthread_mutex); 2320} 2321 2322 2323/* 2324**++ 2325** 2326** ROUTINE NAME: rpc__cthread_invoke_null 2327** 2328** SCOPE: PRIVATE - declared in comcthd.h 2329** 2330** DESCRIPTION: 2331** 2332** Arrange for a call execution thread to (eventually) be allocated to 2333** "execute" the RPC. 2334** 2335** INPUTS: none 2336** 2337** INPUTS/OUTPUTS: 2338** 2339** call_rep The call rep for the incoming call. 2340** call_executor The address of a routine to be called when the 2341** call thread actually wakes up 2342** args A pointer to be passed to the called routine 2343** 2344** OUTPUTS: 2345** 2346** status A value indicating the status of the routine. 2347** 2348** rpc_s_ok 2349** rpc_s_cthread_not_found 2350** rpc_s_call_queued 2351** 2352** IMPLICIT INPUTS: none 2353** 2354** IMPLICIT OUTPUTS: none 2355** 2356** FUNCTION VALUE: void 2357** 2358** SIDE EFFECTS: 2359** call may be queued if no available call executors 2360** 2361**-- 2362**/ 2363 2364PRIVATE void rpc__cthread_invoke_null 2365( 2366 rpc_call_rep_p_t call_rep, 2367 uuid_p_t object, 2368 uuid_p_t if_uuid, 2369 unsigned32 if_ver, 2370 unsigned32 if_opnum, 2371 rpc_prot_cthread_executor_fn_t cthread_executor, 2372 dce_pointer_t args, 2373 unsigned32 *status 2374) 2375{ 2376 rpc_cthread_pvt_info_p_t pvt = &call_rep->u.server.cthread; 2377 unsigned32 lookup_fn_st; 2378 cthread_pool_elt_p_t p; 2379 cthread_elt_p_t cthread; 2380 2381 CODING_ERROR (status); 2382 2383 RPC_MUTEX_LOCK (cthread_mutex); 2384 2385 /* 2386 * Check to ensure that it's still desireable to queue/execute a call. 2387 * 2388 * While strictly speaking we need to examine cthread_invoke_enabled 2389 * under a mutex, we really don't want to pay the cost in this critical 2390 * path and I think things will work reasonably safely get by without it. 2391 * The worst that will happen is that a (couple) extra call(s) will be 2392 * allowed to be queued / executed during shutdown processing. 2393 */ 2394 if (cthread_invoke_enabled == false) 2395 { 2396 *status = rpc_s_cthread_invoke_disabled; 2397 goto CLEANUP; 2398 } 2399 2400 /* 2401 * Setup fields in the call rep for subsequent execution. 2402 */ 2403 pvt->executor = cthread_executor; 2404 pvt->optargs = args; 2405 2406 /* 2407 * Attempt to locate / assign an idle thread (this code is 2408 * in-line because this is the fast-path). 2409 */ 2410 CTHREAD_POOL_LOOKUP_RESERVED(object, if_uuid, if_ver, if_opnum, 2411 &p, &lookup_fn_st); 2412 if (lookup_fn_st != 0) 2413 { 2414 *status = rpc_s_cthread_not_found; 2415 goto CLEANUP; 2416 } 2417 2418 if (p == NULL) 2419 { 2420 /* 2421 * Only concerned with default pool. 2422 */ 2423 p = cthread_default_pool; 2424 2425 CTHREAD_POOL_ASSIGN_THREAD(cthread_default_pool, &cthread); 2426 } 2427 else 2428 { 2429 /* 2430 * First assign an idle reserved pool thread; otherwise, 2431 * assign an idle default pool thread. 2432 */ 2433 CTHREAD_POOL_ASSIGN_THREAD(p, &cthread); 2434 if (cthread == NULL) 2435 { 2436 CTHREAD_POOL_ASSIGN_THREAD(cthread_default_pool, &cthread); 2437 } 2438 } 2439 2440 /* 2441 * If we've succeeded in assigning a cthread, arrange for it to 2442 * actually execute the RPC. Otherwise, attempt to queue the RPC 2443 * for deferred execution. 2444 */ 2445 if (cthread != NULL) 2446 { 2447 /* 2448 * Setup fields in the call rep for subsequent execution. 2449 */ 2450 pvt->is_queued = false; 2451 pvt->thread_h = cthread->thread_id; 2452 cthread->call_rep = call_rep; 2453 2454 /* 2455 * Fire up the assigned cthread. 2456 */ 2457 RPC_COND_SIGNAL(cthread->thread_cond, cthread_mutex); 2458 2459 *status = rpc_s_ok; 2460 } 2461 else 2462 { 2463 cthread_pool_queue_call(p, call_rep, status); 2464 if (*status == rpc_s_ok) 2465 *status = rpc_s_call_queued; 2466 } 2467 2468CLEANUP: 2469 2470 RPC_MUTEX_UNLOCK (cthread_mutex); 2471} 2472 2473 2474/* 2475**++ 2476** 2477** ROUTINE NAME: rpc__cthread_dequeue 2478** 2479** SCOPE: PRIVATE - included in comcthd.h 2480** 2481** DESCRIPTION: Remove a call rep from the call executor 2482** thread waiting queue, if it's there. 2483** 2484** INPUTS: none 2485** 2486** INPUTS/OUTPUTS: 2487** 2488** call_rep The call rep to be dequeued. 2489** 2490** OUTPUTS: none 2491** 2492** IMPLICIT INPUTS: none 2493** 2494** IMPLICIT OUTPUTS: none 2495** 2496** FUNCTION VALUE: boolean 2497** 2498** T => call was previously queued. 2499** 2500** SIDE EFFECTS: none 2501** 2502**-- 2503**/ 2504 2505PRIVATE boolean32 rpc__cthread_dequeue 2506( 2507 rpc_call_rep_p_t call_rep 2508) 2509{ 2510 boolean32 was_dequeued; 2511 2512 RPC_MUTEX_LOCK (cthread_mutex); 2513 2514 was_dequeued = cthread_call_dequeue (call_rep); 2515 2516 RPC_MUTEX_UNLOCK (cthread_mutex); 2517 2518 return was_dequeued; 2519} 2520 2521 2522/* 2523**++ 2524** 2525** ROUTINE NAME: rpc__cthread_cancel 2526** 2527** SCOPE: PRIVATE - included in comcthd.h 2528** 2529** DESCRIPTION: Post a cancel to cthread associated with a call 2530** 2531** INPUTS: none 2532** 2533** INPUTS/OUTPUTS: 2534** 2535** call The call that the cancel is associated with. 2536** 2537** OUTPUTS: none 2538** 2539** IMPLICIT INPUTS: none 2540** 2541** IMPLICIT OUTPUTS: none 2542** 2543** FUNCTION VALUE: void 2544** 2545** SIDE EFFECTS: 2546** a cancel may be posted to the call execution thread 2547** 2548**-- 2549**/ 2550 2551PRIVATE void rpc__cthread_cancel 2552( 2553 rpc_call_rep_p_t call 2554) 2555{ 2556 RPC_CALL_LOCK_ASSERT(call); 2557 2558 if (!call->u.server.cancel.accepting) 2559 return; 2560 2561 call->u.server.cancel.count++; 2562 2563 if (!call->u.server.cancel.queuing) 2564 { 2565 rpc_cthread_pvt_info_p_t pvt = &call->u.server.cthread; 2566 2567 RPC_MUTEX_LOCK (cthread_mutex); 2568 2569 dcethread_interrupt_throw(pvt->thread_h); 2570 2571 RPC_MUTEX_UNLOCK (cthread_mutex); 2572 } 2573} 2574 2575/* 2576**++ 2577** 2578** ROUTINE NAME: rpc__cthread_cancel_caf 2579** 2580** SCOPE: PRIVATE - included in comcthd.h 2581** 2582** DESCRIPTION: Check for pending cancel and flush. 2583** 2584** INPUTS: none 2585** 2586** INPUTS/OUTPUTS: 2587** 2588** call The call that the cancel is associated with. 2589** 2590** OUTPUTS: none 2591** 2592** IMPLICIT INPUTS: none 2593** 2594** IMPLICIT OUTPUTS: none 2595** 2596** FUNCTION VALUE: 2597** boolean32 => T iff call had a pending cancel 2598** 2599** SIDE EFFECTS: 2600** the call will no longer accept cancels 2601** any pending cancels will be flushed (i.e. the 2602** call thread must not have any residual pending 2603** cancels upon completion) 2604** 2605**-- 2606**/ 2607 2608PRIVATE boolean32 rpc__cthread_cancel_caf 2609( 2610 rpc_call_rep_p_t call 2611) 2612{ 2613 int oc; 2614 2615 RPC_CALL_LOCK_ASSERT(call); 2616 2617 /* 2618 * In the event this is called multiple times, return something 2619 * sensible (i.e. return the current "had pending" state). 2620 */ 2621 if (!call->u.server.cancel.accepting) 2622 { 2623 return (call->u.server.cancel.had_pending); 2624 } 2625 2626 /* 2627 * Cancels are no longer accepted by this call. 2628 */ 2629 call->u.server.cancel.accepting = false; 2630 2631 /* 2632 * Determine if the call has a cancel pending (flush any accepted 2633 * cancels). Only want to take the expensive path if a cancel request 2634 * had been previously accepted. 2635 */ 2636 call->u.server.cancel.had_pending = false; 2637 if (call->u.server.cancel.count) 2638 { 2639#ifndef _PTHREAD_NO_CANCEL_SUPPORT 2640 oc = dcethread_enableinterrupt_throw(1); 2641 DCETHREAD_TRY 2642 { 2643 dcethread_checkinterrupt(); 2644 } 2645 DCETHREAD_CATCH(dcethread_interrupt_e) 2646 { 2647 call->u.server.cancel.had_pending = true; 2648 } 2649 DCETHREAD_ENDTRY 2650 dcethread_enableinterrupt_throw(oc); 2651#else 2652 /* 2653 * Cancels not supported, so the previously accepted forwarded 2654 * cancels are still pending. 2655 */ 2656 call->u.server.cancel.had_pending = true; 2657#endif 2658 } 2659 call->u.server.cancel.count = 0; 2660 2661 /* 2662 * Let the caller know if a cancel was pending (without them having 2663 * to look at the flag). 2664 */ 2665 return (call->u.server.cancel.had_pending); 2666} 2667 2668/* 2669**++ 2670** 2671** ROUTINE NAME: rpc__cthread_cancel_enable_post 2672** 2673** SCOPE: PRIVATE - included in comcthd.h 2674** 2675** DESCRIPTION: Enable direct posting of cancels to a cthread; 2676** post any previously queued cancels. 2677** 2678** INPUTS: none 2679** 2680** INPUTS/OUTPUTS: 2681** 2682** call The call that the cancel is associated with. 2683** 2684** OUTPUTS: none 2685** 2686** IMPLICIT INPUTS: none 2687** 2688** IMPLICIT OUTPUTS: none 2689** 2690** FUNCTION VALUE: void 2691** 2692** SIDE EFFECTS: 2693** a cancel may be posted to the call execution thread 2694** 2695**-- 2696**/ 2697 2698PRIVATE void rpc__cthread_cancel_enable_post 2699( 2700 rpc_call_rep_p_t call 2701) 2702{ 2703 rpc_cthread_pvt_info_p_t pvt = &call->u.server.cthread; 2704 unsigned16 cancel_cnt; 2705 2706 RPC_CALL_LOCK_ASSERT(call); 2707 2708 RPC_MUTEX_LOCK (cthread_mutex); 2709 2710 if (call->u.server.cancel.accepting && call->u.server.cancel.queuing) 2711 { 2712 call->u.server.cancel.queuing = false; 2713 for (cancel_cnt = call->u.server.cancel.count; cancel_cnt--; ) 2714 { 2715 dcethread_interrupt_throw(pvt->thread_h); 2716 } 2717 } 2718 2719 RPC_MUTEX_UNLOCK (cthread_mutex); 2720} 2721