1/* 2 * linux/net/sunrpc/sched.c 3 * 4 * Scheduling for synchronous and asynchronous RPC requests. 5 * 6 * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de> 7 * 8 * TCP NFS related read + write fixes 9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 10 */ 11 12#include <linux/module.h> 13 14#define __KERNEL_SYSCALLS__ 15#include <linux/sched.h> 16#include <linux/interrupt.h> 17#include <linux/slab.h> 18#include <linux/unistd.h> 19#include <linux/smp.h> 20#include <linux/smp_lock.h> 21#include <linux/spinlock.h> 22 23#include <linux/sunrpc/clnt.h> 24#include <linux/sunrpc/xprt.h> 25 26#ifdef RPC_DEBUG 27#define RPCDBG_FACILITY RPCDBG_SCHED 28static int rpc_task_id; 29#endif 30 31/* 32 * We give RPC the same get_free_pages priority as NFS 33 */ 34#define GFP_RPC GFP_NOFS 35 36static void __rpc_default_timer(struct rpc_task *task); 37static void rpciod_killall(void); 38 39/* 40 * When an asynchronous RPC task is activated within a bottom half 41 * handler, or while executing another RPC task, it is put on 42 * schedq, and rpciod is woken up. 43 */ 44static RPC_WAITQ(schedq, "schedq"); 45 46/* 47 * RPC tasks that create another task (e.g. for contacting the portmapper) 48 * will wait on this queue for their child's completion 49 */ 50static RPC_WAITQ(childq, "childq"); 51 52/* 53 * RPC tasks sit here while waiting for conditions to improve. 54 */ 55static RPC_WAITQ(delay_queue, "delayq"); 56 57/* 58 * All RPC tasks are linked into this list 59 */ 60static LIST_HEAD(all_tasks); 61 62/* 63 * rpciod-related stuff 64 */ 65static DECLARE_WAIT_QUEUE_HEAD(rpciod_idle); 66static DECLARE_WAIT_QUEUE_HEAD(rpciod_killer); 67static DECLARE_MUTEX(rpciod_sema); 68static unsigned int rpciod_users; 69static pid_t rpciod_pid; 70static int rpc_inhibit; 71 72/* 73 * Spinlock for wait queues. Access to the latter also has to be 74 * interrupt-safe in order to allow timers to wake up sleeping tasks. 75 */ 76static spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED; 77/* 78 * Spinlock for other critical sections of code. 79 */ 80static spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED; 81 82/* 83 * This is the last-ditch buffer for NFS swap requests 84 */ 85static u32 swap_buffer[PAGE_SIZE >> 2]; 86static long swap_buffer_used; 87 88/* 89 * Make allocation of the swap_buffer SMP-safe 90 */ 91static __inline__ int rpc_lock_swapbuf(void) 92{ 93 return !test_and_set_bit(1, &swap_buffer_used); 94} 95static __inline__ void rpc_unlock_swapbuf(void) 96{ 97 clear_bit(1, &swap_buffer_used); 98} 99 100/* 101 * Disable the timer for a given RPC task. Should be called with 102 * rpc_queue_lock and bh_disabled in order to avoid races within 103 * rpc_run_timer(). 104 */ 105static inline void 106__rpc_disable_timer(struct rpc_task *task) 107{ 108 dprintk("RPC: %4d disabling timer\n", task->tk_pid); 109 task->tk_timeout_fn = NULL; 110 task->tk_timeout = 0; 111} 112 113/* 114 * Run a timeout function. 115 * We use the callback in order to allow __rpc_wake_up_task() 116 * and friends to disable the timer synchronously on SMP systems 117 * without calling del_timer_sync(). The latter could cause a 118 * deadlock if called while we're holding spinlocks... 119 */ 120static void 121rpc_run_timer(struct rpc_task *task) 122{ 123 void (*callback)(struct rpc_task *); 124 125 spin_lock_bh(&rpc_queue_lock); 126 callback = task->tk_timeout_fn; 127 task->tk_timeout_fn = NULL; 128 spin_unlock_bh(&rpc_queue_lock); 129 if (callback) { 130 dprintk("RPC: %4d running timer\n", task->tk_pid); 131 callback(task); 132 } 133} 134 135/* 136 * Set up a timer for the current task. 137 */ 138static inline void 139__rpc_add_timer(struct rpc_task *task, rpc_action timer) 140{ 141 if (!task->tk_timeout) 142 return; 143 144 dprintk("RPC: %4d setting alarm for %lu ms\n", 145 task->tk_pid, task->tk_timeout * 1000 / HZ); 146 147 if (timer) 148 task->tk_timeout_fn = timer; 149 else 150 task->tk_timeout_fn = __rpc_default_timer; 151 mod_timer(&task->tk_timer, jiffies + task->tk_timeout); 152} 153 154/* 155 * Set up a timer for an already sleeping task. 156 */ 157void rpc_add_timer(struct rpc_task *task, rpc_action timer) 158{ 159 spin_lock_bh(&rpc_queue_lock); 160 if (!RPC_IS_RUNNING(task)) 161 __rpc_add_timer(task, timer); 162 spin_unlock_bh(&rpc_queue_lock); 163} 164 165/* 166 * Delete any timer for the current task. Because we use del_timer_sync(), 167 * this function should never be called while holding rpc_queue_lock. 168 */ 169static inline void 170rpc_delete_timer(struct rpc_task *task) 171{ 172 if (timer_pending(&task->tk_timer)) { 173 dprintk("RPC: %4d deleting timer\n", task->tk_pid); 174 del_timer_sync(&task->tk_timer); 175 } 176} 177 178/* 179 * Add new request to wait queue. 180 * 181 * Swapper tasks always get inserted at the head of the queue. 182 * This should avoid many nasty memory deadlocks and hopefully 183 * improve overall performance. 184 * Everyone else gets appended to the queue to ensure proper FIFO behavior. 185 */ 186static inline int 187__rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task) 188{ 189 if (task->tk_rpcwait == queue) 190 return 0; 191 192 if (task->tk_rpcwait) { 193 printk(KERN_WARNING "RPC: doubly enqueued task!\n"); 194 return -EWOULDBLOCK; 195 } 196 if (RPC_IS_SWAPPER(task)) 197 list_add(&task->tk_list, &queue->tasks); 198 else 199 list_add_tail(&task->tk_list, &queue->tasks); 200 task->tk_rpcwait = queue; 201 202 dprintk("RPC: %4d added to queue %p \"%s\"\n", 203 task->tk_pid, queue, rpc_qname(queue)); 204 205 return 0; 206} 207 208int 209rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task) 210{ 211 int result; 212 213 spin_lock_bh(&rpc_queue_lock); 214 result = __rpc_add_wait_queue(q, task); 215 spin_unlock_bh(&rpc_queue_lock); 216 return result; 217} 218 219/* 220 * Remove request from queue. 221 * Note: must be called with spin lock held. 222 */ 223static inline void 224__rpc_remove_wait_queue(struct rpc_task *task) 225{ 226 struct rpc_wait_queue *queue = task->tk_rpcwait; 227 228 if (!queue) 229 return; 230 231 list_del(&task->tk_list); 232 task->tk_rpcwait = NULL; 233 234 dprintk("RPC: %4d removed from queue %p \"%s\"\n", 235 task->tk_pid, queue, rpc_qname(queue)); 236} 237 238void 239rpc_remove_wait_queue(struct rpc_task *task) 240{ 241 if (!task->tk_rpcwait) 242 return; 243 spin_lock_bh(&rpc_queue_lock); 244 __rpc_remove_wait_queue(task); 245 spin_unlock_bh(&rpc_queue_lock); 246} 247 248/* 249 * Make an RPC task runnable. 250 * 251 * Note: If the task is ASYNC, this must be called with 252 * the spinlock held to protect the wait queue operation. 253 */ 254static inline void 255rpc_make_runnable(struct rpc_task *task) 256{ 257 if (task->tk_timeout_fn) { 258 printk(KERN_ERR "RPC: task w/ running timer in rpc_make_runnable!!\n"); 259 return; 260 } 261 rpc_set_running(task); 262 if (RPC_IS_ASYNC(task)) { 263 if (RPC_IS_SLEEPING(task)) { 264 int status; 265 status = __rpc_add_wait_queue(&schedq, task); 266 if (status < 0) { 267 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); 268 task->tk_status = status; 269 return; 270 } 271 rpc_clear_sleeping(task); 272 if (waitqueue_active(&rpciod_idle)) 273 wake_up(&rpciod_idle); 274 } 275 } else { 276 rpc_clear_sleeping(task); 277 if (waitqueue_active(&task->tk_wait)) 278 wake_up(&task->tk_wait); 279 } 280} 281 282/* 283 * Place a newly initialized task on the schedq. 284 */ 285static inline void 286rpc_schedule_run(struct rpc_task *task) 287{ 288 /* Don't run a child twice! */ 289 if (RPC_IS_ACTIVATED(task)) 290 return; 291 task->tk_active = 1; 292 rpc_set_sleeping(task); 293 rpc_make_runnable(task); 294} 295 296/* 297 * For other people who may need to wake the I/O daemon 298 * but should (for now) know nothing about its innards 299 */ 300void rpciod_wake_up(void) 301{ 302 if(rpciod_pid==0) 303 printk(KERN_ERR "rpciod: wot no daemon?\n"); 304 if (waitqueue_active(&rpciod_idle)) 305 wake_up(&rpciod_idle); 306} 307 308/* 309 * Prepare for sleeping on a wait queue. 310 * By always appending tasks to the list we ensure FIFO behavior. 311 * NB: An RPC task will only receive interrupt-driven events as long 312 * as it's on a wait queue. 313 */ 314static void 315__rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 316 rpc_action action, rpc_action timer) 317{ 318 int status; 319 320 dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid, 321 rpc_qname(q), jiffies); 322 323 if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) { 324 printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n"); 325 return; 326 } 327 328 /* Mark the task as being activated if so needed */ 329 if (!RPC_IS_ACTIVATED(task)) { 330 task->tk_active = 1; 331 rpc_set_sleeping(task); 332 } 333 334 status = __rpc_add_wait_queue(q, task); 335 if (status) { 336 printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status); 337 task->tk_status = status; 338 } else { 339 rpc_clear_running(task); 340 if (task->tk_callback) { 341 dprintk(KERN_ERR "RPC: %4d overwrites an active callback\n", task->tk_pid); 342 BUG(); 343 } 344 task->tk_callback = action; 345 __rpc_add_timer(task, timer); 346 } 347} 348 349void 350rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, 351 rpc_action action, rpc_action timer) 352{ 353 /* 354 * Protect the queue operations. 355 */ 356 spin_lock_bh(&rpc_queue_lock); 357 __rpc_sleep_on(q, task, action, timer); 358 spin_unlock_bh(&rpc_queue_lock); 359} 360 361/** 362 * __rpc_wake_up_task - wake up a single rpc_task 363 * @task: task to be woken up 364 * 365 * Caller must hold rpc_queue_lock 366 */ 367static void 368__rpc_wake_up_task(struct rpc_task *task) 369{ 370 dprintk("RPC: %4d __rpc_wake_up_task (now %ld inh %d)\n", 371 task->tk_pid, jiffies, rpc_inhibit); 372 373#ifdef RPC_DEBUG 374 if (task->tk_magic != 0xf00baa) { 375 printk(KERN_ERR "RPC: attempt to wake up non-existing task!\n"); 376 rpc_debug = ~0; 377 rpc_show_tasks(); 378 return; 379 } 380#endif 381 /* Has the task been executed yet? If not, we cannot wake it up! */ 382 if (!RPC_IS_ACTIVATED(task)) { 383 printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task); 384 return; 385 } 386 if (RPC_IS_RUNNING(task)) 387 return; 388 389 __rpc_disable_timer(task); 390 if (task->tk_rpcwait != &schedq) 391 __rpc_remove_wait_queue(task); 392 393 rpc_make_runnable(task); 394 395 dprintk("RPC: __rpc_wake_up_task done\n"); 396} 397 398/* 399 * Default timeout handler if none specified by user 400 */ 401static void 402__rpc_default_timer(struct rpc_task *task) 403{ 404 dprintk("RPC: %d timeout (default timer)\n", task->tk_pid); 405 task->tk_status = -ETIMEDOUT; 406 rpc_wake_up_task(task); 407} 408 409/* 410 * Wake up the specified task 411 */ 412void 413rpc_wake_up_task(struct rpc_task *task) 414{ 415 if (RPC_IS_RUNNING(task)) 416 return; 417 spin_lock_bh(&rpc_queue_lock); 418 __rpc_wake_up_task(task); 419 spin_unlock_bh(&rpc_queue_lock); 420} 421 422/* 423 * Wake up the next task on the wait queue. 424 */ 425struct rpc_task * 426rpc_wake_up_next(struct rpc_wait_queue *queue) 427{ 428 struct rpc_task *task = NULL; 429 430 dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue)); 431 spin_lock_bh(&rpc_queue_lock); 432 task_for_first(task, &queue->tasks) 433 __rpc_wake_up_task(task); 434 spin_unlock_bh(&rpc_queue_lock); 435 436 return task; 437} 438 439/** 440 * rpc_wake_up - wake up all rpc_tasks 441 * @queue: rpc_wait_queue on which the tasks are sleeping 442 * 443 * Grabs rpc_queue_lock 444 */ 445void 446rpc_wake_up(struct rpc_wait_queue *queue) 447{ 448 struct rpc_task *task; 449 450 spin_lock_bh(&rpc_queue_lock); 451 while (!list_empty(&queue->tasks)) 452 task_for_first(task, &queue->tasks) 453 __rpc_wake_up_task(task); 454 spin_unlock_bh(&rpc_queue_lock); 455} 456 457/** 458 * rpc_wake_up_status - wake up all rpc_tasks and set their status value. 459 * @queue: rpc_wait_queue on which the tasks are sleeping 460 * @status: status value to set 461 * 462 * Grabs rpc_queue_lock 463 */ 464void 465rpc_wake_up_status(struct rpc_wait_queue *queue, int status) 466{ 467 struct rpc_task *task; 468 469 spin_lock_bh(&rpc_queue_lock); 470 while (!list_empty(&queue->tasks)) { 471 task_for_first(task, &queue->tasks) { 472 task->tk_status = status; 473 __rpc_wake_up_task(task); 474 } 475 } 476 spin_unlock_bh(&rpc_queue_lock); 477} 478 479/* 480 * Run a task at a later time 481 */ 482static void __rpc_atrun(struct rpc_task *); 483void 484rpc_delay(struct rpc_task *task, unsigned long delay) 485{ 486 task->tk_timeout = delay; 487 rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun); 488} 489 490static void 491__rpc_atrun(struct rpc_task *task) 492{ 493 task->tk_status = 0; 494 rpc_wake_up_task(task); 495} 496 497/* 498 * This is the RPC `scheduler' (or rather, the finite state machine). 499 */ 500static int 501__rpc_execute(struct rpc_task *task) 502{ 503 int status = 0; 504 505 dprintk("RPC: %4d rpc_execute flgs %x\n", 506 task->tk_pid, task->tk_flags); 507 508 if (!RPC_IS_RUNNING(task)) { 509 printk(KERN_WARNING "RPC: rpc_execute called for sleeping task!!\n"); 510 return 0; 511 } 512 513 restarted: 514 while (1) { 515 /* 516 * Execute any pending callback. 517 */ 518 if (RPC_DO_CALLBACK(task)) { 519 /* Define a callback save pointer */ 520 void (*save_callback)(struct rpc_task *); 521 522 /* 523 * If a callback exists, save it, reset it, 524 * call it. 525 * The save is needed to stop from resetting 526 * another callback set within the callback handler 527 * - Dave 528 */ 529 save_callback=task->tk_callback; 530 task->tk_callback=NULL; 531 save_callback(task); 532 } 533 534 /* 535 * Perform the next FSM step. 536 * tk_action may be NULL when the task has been killed 537 * by someone else. 538 */ 539 if (RPC_IS_RUNNING(task)) { 540 /* 541 * Garbage collection of pending timers... 542 */ 543 rpc_delete_timer(task); 544 if (!task->tk_action) 545 break; 546 task->tk_action(task); 547 } 548 549 /* 550 * Check whether task is sleeping. 551 */ 552 spin_lock_bh(&rpc_queue_lock); 553 if (!RPC_IS_RUNNING(task)) { 554 rpc_set_sleeping(task); 555 if (RPC_IS_ASYNC(task)) { 556 spin_unlock_bh(&rpc_queue_lock); 557 return 0; 558 } 559 } 560 spin_unlock_bh(&rpc_queue_lock); 561 562 while (RPC_IS_SLEEPING(task)) { 563 /* sync task: sleep here */ 564 dprintk("RPC: %4d sync task going to sleep\n", 565 task->tk_pid); 566 if (current->pid == rpciod_pid) 567 printk(KERN_ERR "RPC: rpciod waiting on sync task!\n"); 568 569 __wait_event(task->tk_wait, !RPC_IS_SLEEPING(task)); 570 dprintk("RPC: %4d sync task resuming\n", task->tk_pid); 571 572 /* 573 * When a sync task receives a signal, it exits with 574 * -ERESTARTSYS. In order to catch any callbacks that 575 * clean up after sleeping on some queue, we don't 576 * break the loop here, but go around once more. 577 */ 578 if (task->tk_client->cl_intr && signalled()) { 579 dprintk("RPC: %4d got signal\n", task->tk_pid); 580 task->tk_flags |= RPC_TASK_KILLED; 581 rpc_exit(task, -ERESTARTSYS); 582 rpc_wake_up_task(task); 583 } 584 } 585 } 586 587 if (task->tk_exit) { 588 task->tk_exit(task); 589 /* If tk_action is non-null, the user wants us to restart */ 590 if (task->tk_action) { 591 if (!RPC_ASSASSINATED(task)) { 592 /* Release RPC slot and buffer memory */ 593 if (task->tk_rqstp) 594 xprt_release(task); 595 if (task->tk_buffer) { 596 rpc_free(task->tk_buffer); 597 task->tk_buffer = NULL; 598 } 599 goto restarted; 600 } 601 printk(KERN_ERR "RPC: dead task tries to walk away.\n"); 602 } 603 } 604 605 dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status); 606 status = task->tk_status; 607 608 /* Release all resources associated with the task */ 609 rpc_release_task(task); 610 611 return status; 612} 613 614/* 615 * User-visible entry point to the scheduler. 616 * 617 * This may be called recursively if e.g. an async NFS task updates 618 * the attributes and finds that dirty pages must be flushed. 619 * NOTE: Upon exit of this function the task is guaranteed to be 620 * released. In particular note that tk_release() will have 621 * been called, so your task memory may have been freed. 622 */ 623int 624rpc_execute(struct rpc_task *task) 625{ 626 int status = -EIO; 627 if (rpc_inhibit) { 628 printk(KERN_INFO "RPC: execution inhibited!\n"); 629 goto out_release; 630 } 631 632 status = -EWOULDBLOCK; 633 if (task->tk_active) { 634 printk(KERN_ERR "RPC: active task was run twice!\n"); 635 goto out_err; 636 } 637 638 task->tk_active = 1; 639 rpc_set_running(task); 640 return __rpc_execute(task); 641 out_release: 642 rpc_release_task(task); 643 out_err: 644 return status; 645} 646 647/* 648 * This is our own little scheduler for async RPC tasks. 649 */ 650static void 651__rpc_schedule(void) 652{ 653 struct rpc_task *task; 654 int count = 0; 655 656 dprintk("RPC: rpc_schedule enter\n"); 657 while (1) { 658 spin_lock_bh(&rpc_queue_lock); 659 660 task_for_first(task, &schedq.tasks) { 661 __rpc_remove_wait_queue(task); 662 spin_unlock_bh(&rpc_queue_lock); 663 664 __rpc_execute(task); 665 } else { 666 spin_unlock_bh(&rpc_queue_lock); 667 break; 668 } 669 670 if (++count >= 200 || current->need_resched) { 671 count = 0; 672 schedule(); 673 } 674 } 675 dprintk("RPC: rpc_schedule leave\n"); 676} 677 678/* 679 * Allocate memory for RPC purpose. 680 * 681 * This is yet another tricky issue: For sync requests issued by 682 * a user process, we want to make kmalloc sleep if there isn't 683 * enough memory. Async requests should not sleep too excessively 684 * because that will block rpciod (but that's not dramatic when 685 * it's starved of memory anyway). Finally, swapout requests should 686 * never sleep at all, and should not trigger another swap_out 687 * request through kmalloc which would just increase memory contention. 688 * 689 * I hope the following gets it right, which gives async requests 690 * a slight advantage over sync requests (good for writeback, debatable 691 * for readahead): 692 * 693 * sync user requests: GFP_KERNEL 694 * async requests: GFP_RPC (== GFP_NOFS) 695 * swap requests: GFP_ATOMIC (or new GFP_SWAPPER) 696 */ 697void * 698rpc_allocate(unsigned int flags, unsigned int size) 699{ 700 u32 *buffer; 701 int gfp; 702 703 if (flags & RPC_TASK_SWAPPER) 704 gfp = GFP_ATOMIC; 705 else if (flags & RPC_TASK_ASYNC) 706 gfp = GFP_RPC; 707 else 708 gfp = GFP_KERNEL; 709 710 do { 711 if ((buffer = (u32 *) kmalloc(size, gfp)) != NULL) { 712 dprintk("RPC: allocated buffer %p\n", buffer); 713 return buffer; 714 } 715 if ((flags & RPC_TASK_SWAPPER) && size <= sizeof(swap_buffer) 716 && rpc_lock_swapbuf()) { 717 dprintk("RPC: used last-ditch swap buffer\n"); 718 return swap_buffer; 719 } 720 if (flags & RPC_TASK_ASYNC) 721 return NULL; 722 yield(); 723 } while (!signalled()); 724 725 return NULL; 726} 727 728void 729rpc_free(void *buffer) 730{ 731 if (buffer != swap_buffer) { 732 kfree(buffer); 733 return; 734 } 735 rpc_unlock_swapbuf(); 736} 737 738/* 739 * Creation and deletion of RPC task structures 740 */ 741inline void 742rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, 743 rpc_action callback, int flags) 744{ 745 memset(task, 0, sizeof(*task)); 746 init_timer(&task->tk_timer); 747 task->tk_timer.data = (unsigned long) task; 748 task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer; 749 task->tk_client = clnt; 750 task->tk_flags = flags; 751 task->tk_exit = callback; 752 init_waitqueue_head(&task->tk_wait); 753 if (current->uid != current->fsuid || current->gid != current->fsgid) 754 task->tk_flags |= RPC_TASK_SETUID; 755 756 /* Initialize retry counters */ 757 task->tk_garb_retry = 2; 758 task->tk_cred_retry = 2; 759 task->tk_suid_retry = 1; 760 761 /* Add to global list of all tasks */ 762 spin_lock(&rpc_sched_lock); 763 list_add(&task->tk_task, &all_tasks); 764 spin_unlock(&rpc_sched_lock); 765 766 if (clnt) 767 atomic_inc(&clnt->cl_users); 768 769#ifdef RPC_DEBUG 770 task->tk_magic = 0xf00baa; 771 task->tk_pid = rpc_task_id++; 772#endif 773 dprintk("RPC: %4d new task procpid %d\n", task->tk_pid, 774 current->pid); 775} 776 777static void 778rpc_default_free_task(struct rpc_task *task) 779{ 780 dprintk("RPC: %4d freeing task\n", task->tk_pid); 781 rpc_free(task); 782} 783 784/* 785 * Create a new task for the specified client. We have to 786 * clean up after an allocation failure, as the client may 787 * have specified "oneshot". 788 */ 789struct rpc_task * 790rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags) 791{ 792 struct rpc_task *task; 793 794 task = (struct rpc_task *) rpc_allocate(flags, sizeof(*task)); 795 if (!task) 796 goto cleanup; 797 798 rpc_init_task(task, clnt, callback, flags); 799 800 /* Replace tk_release */ 801 task->tk_release = rpc_default_free_task; 802 803 dprintk("RPC: %4d allocated task\n", task->tk_pid); 804 task->tk_flags |= RPC_TASK_DYNAMIC; 805out: 806 return task; 807 808cleanup: 809 /* Check whether to release the client */ 810 if (clnt) { 811 printk("rpc_new_task: failed, users=%d, oneshot=%d\n", 812 atomic_read(&clnt->cl_users), clnt->cl_oneshot); 813 atomic_inc(&clnt->cl_users); /* pretend we were used ... */ 814 rpc_release_client(clnt); 815 } 816 goto out; 817} 818 819void 820rpc_release_task(struct rpc_task *task) 821{ 822 dprintk("RPC: %4d release task\n", task->tk_pid); 823 824#ifdef RPC_DEBUG 825 if (task->tk_magic != 0xf00baa) { 826 printk(KERN_ERR "RPC: attempt to release a non-existing task!\n"); 827 rpc_debug = ~0; 828 rpc_show_tasks(); 829 return; 830 } 831#endif 832 833 /* Remove from global task list */ 834 spin_lock(&rpc_sched_lock); 835 list_del(&task->tk_task); 836 spin_unlock(&rpc_sched_lock); 837 838 /* Protect the execution below. */ 839 spin_lock_bh(&rpc_queue_lock); 840 841 /* Disable timer to prevent zombie wakeup */ 842 __rpc_disable_timer(task); 843 844 /* Remove from any wait queue we're still on */ 845 __rpc_remove_wait_queue(task); 846 847 task->tk_active = 0; 848 849 spin_unlock_bh(&rpc_queue_lock); 850 851 /* Synchronously delete any running timer */ 852 rpc_delete_timer(task); 853 854 /* Release resources */ 855 if (task->tk_rqstp) 856 xprt_release(task); 857 if (task->tk_msg.rpc_cred) 858 rpcauth_unbindcred(task); 859 if (task->tk_buffer) { 860 rpc_free(task->tk_buffer); 861 task->tk_buffer = NULL; 862 } 863 if (task->tk_client) { 864 rpc_release_client(task->tk_client); 865 task->tk_client = NULL; 866 } 867 868#ifdef RPC_DEBUG 869 task->tk_magic = 0; 870#endif 871 if (task->tk_release) 872 task->tk_release(task); 873} 874 875/** 876 * rpc_find_parent - find the parent of a child task. 877 * @child: child task 878 * 879 * Checks that the parent task is still sleeping on the 880 * queue 'childq'. If so returns a pointer to the parent. 881 * Upon failure returns NULL. 882 * 883 * Caller must hold rpc_queue_lock 884 */ 885static inline struct rpc_task * 886rpc_find_parent(struct rpc_task *child) 887{ 888 struct rpc_task *task, *parent; 889 struct list_head *le; 890 891 parent = (struct rpc_task *) child->tk_calldata; 892 task_for_each(task, le, &childq.tasks) 893 if (task == parent) 894 return parent; 895 896 return NULL; 897} 898 899static void 900rpc_child_exit(struct rpc_task *child) 901{ 902 struct rpc_task *parent; 903 904 spin_lock_bh(&rpc_queue_lock); 905 if ((parent = rpc_find_parent(child)) != NULL) { 906 parent->tk_status = child->tk_status; 907 __rpc_wake_up_task(parent); 908 } 909 spin_unlock_bh(&rpc_queue_lock); 910} 911 912/* 913 * Note: rpc_new_task releases the client after a failure. 914 */ 915struct rpc_task * 916rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent) 917{ 918 struct rpc_task *task; 919 920 task = rpc_new_task(clnt, NULL, RPC_TASK_ASYNC | RPC_TASK_CHILD); 921 if (!task) 922 goto fail; 923 task->tk_exit = rpc_child_exit; 924 task->tk_calldata = parent; 925 return task; 926 927fail: 928 parent->tk_status = -ENOMEM; 929 return NULL; 930} 931 932void 933rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func) 934{ 935 spin_lock_bh(&rpc_queue_lock); 936 /* N.B. Is it possible for the child to have already finished? */ 937 __rpc_sleep_on(&childq, task, func, NULL); 938 rpc_schedule_run(child); 939 spin_unlock_bh(&rpc_queue_lock); 940} 941 942void 943rpc_killall_tasks(struct rpc_clnt *clnt) 944{ 945 struct rpc_task *rovr; 946 struct list_head *le; 947 948 dprintk("RPC: killing all tasks for client %p\n", clnt); 949 950 /* 951 * Spin lock all_tasks to prevent changes... 952 */ 953 spin_lock(&rpc_sched_lock); 954 alltask_for_each(rovr, le, &all_tasks) 955 if (!clnt || rovr->tk_client == clnt) { 956 rovr->tk_flags |= RPC_TASK_KILLED; 957 rpc_exit(rovr, -EIO); 958 rpc_wake_up_task(rovr); 959 } 960 spin_unlock(&rpc_sched_lock); 961} 962 963static DECLARE_MUTEX_LOCKED(rpciod_running); 964 965static inline int 966rpciod_task_pending(void) 967{ 968 return !list_empty(&schedq.tasks); 969} 970 971 972/* 973 * This is the rpciod kernel thread 974 */ 975static int 976rpciod(void *ptr) 977{ 978 wait_queue_head_t *assassin = (wait_queue_head_t*) ptr; 979 int rounds = 0; 980 981 MOD_INC_USE_COUNT; 982 lock_kernel(); 983 /* 984 * Let our maker know we're running ... 985 */ 986 rpciod_pid = current->pid; 987 up(&rpciod_running); 988 989 daemonize(); 990 991 spin_lock_irq(¤t->sigmask_lock); 992 siginitsetinv(¤t->blocked, sigmask(SIGKILL)); 993 recalc_sigpending(current); 994 spin_unlock_irq(¤t->sigmask_lock); 995 996 strcpy(current->comm, "rpciod"); 997 998 dprintk("RPC: rpciod starting (pid %d)\n", rpciod_pid); 999 while (rpciod_users) { 1000 if (signalled()) { 1001 rpciod_killall(); 1002 flush_signals(current); 1003 } 1004 __rpc_schedule(); 1005 1006 if (++rounds >= 64) { /* safeguard */ 1007 schedule(); 1008 rounds = 0; 1009 } 1010 1011 if (!rpciod_task_pending()) { 1012 dprintk("RPC: rpciod back to sleep\n"); 1013 wait_event_interruptible(rpciod_idle, rpciod_task_pending()); 1014 dprintk("RPC: switch to rpciod\n"); 1015 rounds = 0; 1016 } 1017 } 1018 1019 dprintk("RPC: rpciod shutdown commences\n"); 1020 if (!list_empty(&all_tasks)) { 1021 printk(KERN_ERR "rpciod: active tasks at shutdown?!\n"); 1022 rpciod_killall(); 1023 } 1024 1025 rpciod_pid = 0; 1026 wake_up(assassin); 1027 1028 dprintk("RPC: rpciod exiting\n"); 1029 MOD_DEC_USE_COUNT; 1030 return 0; 1031} 1032 1033static void 1034rpciod_killall(void) 1035{ 1036 unsigned long flags; 1037 1038 while (!list_empty(&all_tasks)) { 1039 current->sigpending = 0; 1040 rpc_killall_tasks(NULL); 1041 __rpc_schedule(); 1042 if (!list_empty(&all_tasks)) { 1043 dprintk("rpciod_killall: waiting for tasks to exit\n"); 1044 yield(); 1045 } 1046 } 1047 1048 spin_lock_irqsave(¤t->sigmask_lock, flags); 1049 recalc_sigpending(current); 1050 spin_unlock_irqrestore(¤t->sigmask_lock, flags); 1051} 1052 1053/* 1054 * Start up the rpciod process if it's not already running. 1055 */ 1056int 1057rpciod_up(void) 1058{ 1059 int error = 0; 1060 1061 MOD_INC_USE_COUNT; 1062 down(&rpciod_sema); 1063 dprintk("rpciod_up: pid %d, users %d\n", rpciod_pid, rpciod_users); 1064 rpciod_users++; 1065 if (rpciod_pid) 1066 goto out; 1067 /* 1068 * If there's no pid, we should be the first user. 1069 */ 1070 if (rpciod_users > 1) 1071 printk(KERN_WARNING "rpciod_up: no pid, %d users??\n", rpciod_users); 1072 /* 1073 * Create the rpciod thread and wait for it to start. 1074 */ 1075 error = kernel_thread(rpciod, &rpciod_killer, 0); 1076 if (error < 0) { 1077 printk(KERN_WARNING "rpciod_up: create thread failed, error=%d\n", error); 1078 rpciod_users--; 1079 goto out; 1080 } 1081 down(&rpciod_running); 1082 error = 0; 1083out: 1084 up(&rpciod_sema); 1085 MOD_DEC_USE_COUNT; 1086 return error; 1087} 1088 1089void 1090rpciod_down(void) 1091{ 1092 unsigned long flags; 1093 1094 MOD_INC_USE_COUNT; 1095 down(&rpciod_sema); 1096 dprintk("rpciod_down pid %d sema %d\n", rpciod_pid, rpciod_users); 1097 if (rpciod_users) { 1098 if (--rpciod_users) 1099 goto out; 1100 } else 1101 printk(KERN_WARNING "rpciod_down: pid=%d, no users??\n", rpciod_pid); 1102 1103 if (!rpciod_pid) { 1104 dprintk("rpciod_down: Nothing to do!\n"); 1105 goto out; 1106 } 1107 1108 kill_proc(rpciod_pid, SIGKILL, 1); 1109 /* 1110 * Usually rpciod will exit very quickly, so we 1111 * wait briefly before checking the process id. 1112 */ 1113 current->sigpending = 0; 1114 yield(); 1115 /* 1116 * Display a message if we're going to wait longer. 1117 */ 1118 while (rpciod_pid) { 1119 dprintk("rpciod_down: waiting for pid %d to exit\n", rpciod_pid); 1120 if (signalled()) { 1121 dprintk("rpciod_down: caught signal\n"); 1122 break; 1123 } 1124 interruptible_sleep_on(&rpciod_killer); 1125 } 1126 spin_lock_irqsave(¤t->sigmask_lock, flags); 1127 recalc_sigpending(current); 1128 spin_unlock_irqrestore(¤t->sigmask_lock, flags); 1129out: 1130 up(&rpciod_sema); 1131 MOD_DEC_USE_COUNT; 1132} 1133 1134#ifdef RPC_DEBUG 1135void rpc_show_tasks(void) 1136{ 1137 struct list_head *le; 1138 struct rpc_task *t; 1139 1140 spin_lock(&rpc_sched_lock); 1141 if (list_empty(&all_tasks)) { 1142 spin_unlock(&rpc_sched_lock); 1143 return; 1144 } 1145 printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout " 1146 "-rpcwait -action- --exit--\n"); 1147 alltask_for_each(t, le, &all_tasks) 1148 printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n", 1149 t->tk_pid, t->tk_msg.rpc_proc, t->tk_flags, t->tk_status, 1150 t->tk_client, t->tk_client->cl_prog, 1151 t->tk_rqstp, t->tk_timeout, 1152 t->tk_rpcwait ? rpc_qname(t->tk_rpcwait) : " <NULL> ", 1153 t->tk_action, t->tk_exit); 1154 spin_unlock(&rpc_sched_lock); 1155} 1156#endif 1157