1/*
2 * linux/net/sunrpc/sched.c
3 *
4 * Scheduling for synchronous and asynchronous RPC requests.
5 *
6 * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
7 *
8 * TCP NFS related read + write fixes
9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10 */
11
12#include <linux/module.h>
13
14#define __KERNEL_SYSCALLS__
15#include <linux/sched.h>
16#include <linux/interrupt.h>
17#include <linux/slab.h>
18#include <linux/unistd.h>
19#include <linux/smp.h>
20#include <linux/smp_lock.h>
21#include <linux/spinlock.h>
22
23#include <linux/sunrpc/clnt.h>
24#include <linux/sunrpc/xprt.h>
25
26#ifdef RPC_DEBUG
27#define RPCDBG_FACILITY		RPCDBG_SCHED
28static int			rpc_task_id;
29#endif
30
31/*
32 * We give RPC the same get_free_pages priority as NFS
33 */
34#define GFP_RPC			GFP_NOFS
35
36static void			__rpc_default_timer(struct rpc_task *task);
37static void			rpciod_killall(void);
38
39/*
40 * When an asynchronous RPC task is activated within a bottom half
41 * handler, or while executing another RPC task, it is put on
42 * schedq, and rpciod is woken up.
43 */
44static RPC_WAITQ(schedq, "schedq");
45
46/*
47 * RPC tasks that create another task (e.g. for contacting the portmapper)
48 * will wait on this queue for their child's completion
49 */
50static RPC_WAITQ(childq, "childq");
51
52/*
53 * RPC tasks sit here while waiting for conditions to improve.
54 */
55static RPC_WAITQ(delay_queue, "delayq");
56
57/*
58 * All RPC tasks are linked into this list
59 */
60static LIST_HEAD(all_tasks);
61
62/*
63 * rpciod-related stuff
64 */
65static DECLARE_WAIT_QUEUE_HEAD(rpciod_idle);
66static DECLARE_WAIT_QUEUE_HEAD(rpciod_killer);
67static DECLARE_MUTEX(rpciod_sema);
68static unsigned int		rpciod_users;
69static pid_t			rpciod_pid;
70static int			rpc_inhibit;
71
72/*
73 * Spinlock for wait queues. Access to the latter also has to be
74 * interrupt-safe in order to allow timers to wake up sleeping tasks.
75 */
76static spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED;
77/*
78 * Spinlock for other critical sections of code.
79 */
80static spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED;
81
82/*
83 * This is the last-ditch buffer for NFS swap requests
84 */
85static u32			swap_buffer[PAGE_SIZE >> 2];
86static long			swap_buffer_used;
87
88/*
89 * Make allocation of the swap_buffer SMP-safe
90 */
91static __inline__ int rpc_lock_swapbuf(void)
92{
93	return !test_and_set_bit(1, &swap_buffer_used);
94}
95static __inline__ void rpc_unlock_swapbuf(void)
96{
97	clear_bit(1, &swap_buffer_used);
98}
99
100/*
101 * Disable the timer for a given RPC task. Should be called with
102 * rpc_queue_lock and bh_disabled in order to avoid races within
103 * rpc_run_timer().
104 */
105static inline void
106__rpc_disable_timer(struct rpc_task *task)
107{
108	dprintk("RPC: %4d disabling timer\n", task->tk_pid);
109	task->tk_timeout_fn = NULL;
110	task->tk_timeout = 0;
111}
112
113/*
114 * Run a timeout function.
115 * We use the callback in order to allow __rpc_wake_up_task()
116 * and friends to disable the timer synchronously on SMP systems
117 * without calling del_timer_sync(). The latter could cause a
118 * deadlock if called while we're holding spinlocks...
119 */
120static void
121rpc_run_timer(struct rpc_task *task)
122{
123	void (*callback)(struct rpc_task *);
124
125	spin_lock_bh(&rpc_queue_lock);
126	callback = task->tk_timeout_fn;
127	task->tk_timeout_fn = NULL;
128	spin_unlock_bh(&rpc_queue_lock);
129	if (callback) {
130		dprintk("RPC: %4d running timer\n", task->tk_pid);
131		callback(task);
132	}
133}
134
135/*
136 * Set up a timer for the current task.
137 */
138static inline void
139__rpc_add_timer(struct rpc_task *task, rpc_action timer)
140{
141	if (!task->tk_timeout)
142		return;
143
144	dprintk("RPC: %4d setting alarm for %lu ms\n",
145			task->tk_pid, task->tk_timeout * 1000 / HZ);
146
147	if (timer)
148		task->tk_timeout_fn = timer;
149	else
150		task->tk_timeout_fn = __rpc_default_timer;
151	mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
152}
153
154/*
155 * Set up a timer for an already sleeping task.
156 */
157void rpc_add_timer(struct rpc_task *task, rpc_action timer)
158{
159	spin_lock_bh(&rpc_queue_lock);
160	if (!RPC_IS_RUNNING(task))
161		__rpc_add_timer(task, timer);
162	spin_unlock_bh(&rpc_queue_lock);
163}
164
165/*
166 * Delete any timer for the current task. Because we use del_timer_sync(),
167 * this function should never be called while holding rpc_queue_lock.
168 */
169static inline void
170rpc_delete_timer(struct rpc_task *task)
171{
172	if (timer_pending(&task->tk_timer)) {
173		dprintk("RPC: %4d deleting timer\n", task->tk_pid);
174		del_timer_sync(&task->tk_timer);
175	}
176}
177
178/*
179 * Add new request to wait queue.
180 *
181 * Swapper tasks always get inserted at the head of the queue.
182 * This should avoid many nasty memory deadlocks and hopefully
183 * improve overall performance.
184 * Everyone else gets appended to the queue to ensure proper FIFO behavior.
185 */
186static inline int
187__rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
188{
189	if (task->tk_rpcwait == queue)
190		return 0;
191
192	if (task->tk_rpcwait) {
193		printk(KERN_WARNING "RPC: doubly enqueued task!\n");
194		return -EWOULDBLOCK;
195	}
196	if (RPC_IS_SWAPPER(task))
197		list_add(&task->tk_list, &queue->tasks);
198	else
199		list_add_tail(&task->tk_list, &queue->tasks);
200	task->tk_rpcwait = queue;
201
202	dprintk("RPC: %4d added to queue %p \"%s\"\n",
203				task->tk_pid, queue, rpc_qname(queue));
204
205	return 0;
206}
207
208int
209rpc_add_wait_queue(struct rpc_wait_queue *q, struct rpc_task *task)
210{
211	int		result;
212
213	spin_lock_bh(&rpc_queue_lock);
214	result = __rpc_add_wait_queue(q, task);
215	spin_unlock_bh(&rpc_queue_lock);
216	return result;
217}
218
219/*
220 * Remove request from queue.
221 * Note: must be called with spin lock held.
222 */
223static inline void
224__rpc_remove_wait_queue(struct rpc_task *task)
225{
226	struct rpc_wait_queue *queue = task->tk_rpcwait;
227
228	if (!queue)
229		return;
230
231	list_del(&task->tk_list);
232	task->tk_rpcwait = NULL;
233
234	dprintk("RPC: %4d removed from queue %p \"%s\"\n",
235				task->tk_pid, queue, rpc_qname(queue));
236}
237
238void
239rpc_remove_wait_queue(struct rpc_task *task)
240{
241	if (!task->tk_rpcwait)
242		return;
243	spin_lock_bh(&rpc_queue_lock);
244	__rpc_remove_wait_queue(task);
245	spin_unlock_bh(&rpc_queue_lock);
246}
247
248/*
249 * Make an RPC task runnable.
250 *
251 * Note: If the task is ASYNC, this must be called with
252 * the spinlock held to protect the wait queue operation.
253 */
254static inline void
255rpc_make_runnable(struct rpc_task *task)
256{
257	if (task->tk_timeout_fn) {
258		printk(KERN_ERR "RPC: task w/ running timer in rpc_make_runnable!!\n");
259		return;
260	}
261	rpc_set_running(task);
262	if (RPC_IS_ASYNC(task)) {
263		if (RPC_IS_SLEEPING(task)) {
264			int status;
265			status = __rpc_add_wait_queue(&schedq, task);
266			if (status < 0) {
267				printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
268				task->tk_status = status;
269				return;
270			}
271			rpc_clear_sleeping(task);
272			if (waitqueue_active(&rpciod_idle))
273				wake_up(&rpciod_idle);
274		}
275	} else {
276		rpc_clear_sleeping(task);
277		if (waitqueue_active(&task->tk_wait))
278			wake_up(&task->tk_wait);
279	}
280}
281
282/*
283 * Place a newly initialized task on the schedq.
284 */
285static inline void
286rpc_schedule_run(struct rpc_task *task)
287{
288	/* Don't run a child twice! */
289	if (RPC_IS_ACTIVATED(task))
290		return;
291	task->tk_active = 1;
292	rpc_set_sleeping(task);
293	rpc_make_runnable(task);
294}
295
296/*
297 *	For other people who may need to wake the I/O daemon
298 *	but should (for now) know nothing about its innards
299 */
300void rpciod_wake_up(void)
301{
302	if(rpciod_pid==0)
303		printk(KERN_ERR "rpciod: wot no daemon?\n");
304	if (waitqueue_active(&rpciod_idle))
305		wake_up(&rpciod_idle);
306}
307
308/*
309 * Prepare for sleeping on a wait queue.
310 * By always appending tasks to the list we ensure FIFO behavior.
311 * NB: An RPC task will only receive interrupt-driven events as long
312 * as it's on a wait queue.
313 */
314static void
315__rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
316			rpc_action action, rpc_action timer)
317{
318	int status;
319
320	dprintk("RPC: %4d sleep_on(queue \"%s\" time %ld)\n", task->tk_pid,
321				rpc_qname(q), jiffies);
322
323	if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) {
324		printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n");
325		return;
326	}
327
328	/* Mark the task as being activated if so needed */
329	if (!RPC_IS_ACTIVATED(task)) {
330		task->tk_active = 1;
331		rpc_set_sleeping(task);
332	}
333
334	status = __rpc_add_wait_queue(q, task);
335	if (status) {
336		printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
337		task->tk_status = status;
338	} else {
339		rpc_clear_running(task);
340		if (task->tk_callback) {
341			dprintk(KERN_ERR "RPC: %4d overwrites an active callback\n", task->tk_pid);
342			BUG();
343		}
344		task->tk_callback = action;
345		__rpc_add_timer(task, timer);
346	}
347}
348
349void
350rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
351				rpc_action action, rpc_action timer)
352{
353	/*
354	 * Protect the queue operations.
355	 */
356	spin_lock_bh(&rpc_queue_lock);
357	__rpc_sleep_on(q, task, action, timer);
358	spin_unlock_bh(&rpc_queue_lock);
359}
360
361/**
362 * __rpc_wake_up_task - wake up a single rpc_task
363 * @task: task to be woken up
364 *
365 * Caller must hold rpc_queue_lock
366 */
367static void
368__rpc_wake_up_task(struct rpc_task *task)
369{
370	dprintk("RPC: %4d __rpc_wake_up_task (now %ld inh %d)\n",
371					task->tk_pid, jiffies, rpc_inhibit);
372
373#ifdef RPC_DEBUG
374	if (task->tk_magic != 0xf00baa) {
375		printk(KERN_ERR "RPC: attempt to wake up non-existing task!\n");
376		rpc_debug = ~0;
377		rpc_show_tasks();
378		return;
379	}
380#endif
381	/* Has the task been executed yet? If not, we cannot wake it up! */
382	if (!RPC_IS_ACTIVATED(task)) {
383		printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
384		return;
385	}
386	if (RPC_IS_RUNNING(task))
387		return;
388
389	__rpc_disable_timer(task);
390	if (task->tk_rpcwait != &schedq)
391		__rpc_remove_wait_queue(task);
392
393	rpc_make_runnable(task);
394
395	dprintk("RPC:      __rpc_wake_up_task done\n");
396}
397
398/*
399 * Default timeout handler if none specified by user
400 */
401static void
402__rpc_default_timer(struct rpc_task *task)
403{
404	dprintk("RPC: %d timeout (default timer)\n", task->tk_pid);
405	task->tk_status = -ETIMEDOUT;
406	rpc_wake_up_task(task);
407}
408
409/*
410 * Wake up the specified task
411 */
412void
413rpc_wake_up_task(struct rpc_task *task)
414{
415	if (RPC_IS_RUNNING(task))
416		return;
417	spin_lock_bh(&rpc_queue_lock);
418	__rpc_wake_up_task(task);
419	spin_unlock_bh(&rpc_queue_lock);
420}
421
422/*
423 * Wake up the next task on the wait queue.
424 */
425struct rpc_task *
426rpc_wake_up_next(struct rpc_wait_queue *queue)
427{
428	struct rpc_task	*task = NULL;
429
430	dprintk("RPC:      wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
431	spin_lock_bh(&rpc_queue_lock);
432	task_for_first(task, &queue->tasks)
433		__rpc_wake_up_task(task);
434	spin_unlock_bh(&rpc_queue_lock);
435
436	return task;
437}
438
439/**
440 * rpc_wake_up - wake up all rpc_tasks
441 * @queue: rpc_wait_queue on which the tasks are sleeping
442 *
443 * Grabs rpc_queue_lock
444 */
445void
446rpc_wake_up(struct rpc_wait_queue *queue)
447{
448	struct rpc_task *task;
449
450	spin_lock_bh(&rpc_queue_lock);
451	while (!list_empty(&queue->tasks))
452		task_for_first(task, &queue->tasks)
453			__rpc_wake_up_task(task);
454	spin_unlock_bh(&rpc_queue_lock);
455}
456
457/**
458 * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
459 * @queue: rpc_wait_queue on which the tasks are sleeping
460 * @status: status value to set
461 *
462 * Grabs rpc_queue_lock
463 */
464void
465rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
466{
467	struct rpc_task *task;
468
469	spin_lock_bh(&rpc_queue_lock);
470	while (!list_empty(&queue->tasks)) {
471		task_for_first(task, &queue->tasks) {
472			task->tk_status = status;
473			__rpc_wake_up_task(task);
474		}
475	}
476	spin_unlock_bh(&rpc_queue_lock);
477}
478
479/*
480 * Run a task at a later time
481 */
482static void	__rpc_atrun(struct rpc_task *);
483void
484rpc_delay(struct rpc_task *task, unsigned long delay)
485{
486	task->tk_timeout = delay;
487	rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
488}
489
490static void
491__rpc_atrun(struct rpc_task *task)
492{
493	task->tk_status = 0;
494	rpc_wake_up_task(task);
495}
496
497/*
498 * This is the RPC `scheduler' (or rather, the finite state machine).
499 */
500static int
501__rpc_execute(struct rpc_task *task)
502{
503	int		status = 0;
504
505	dprintk("RPC: %4d rpc_execute flgs %x\n",
506				task->tk_pid, task->tk_flags);
507
508	if (!RPC_IS_RUNNING(task)) {
509		printk(KERN_WARNING "RPC: rpc_execute called for sleeping task!!\n");
510		return 0;
511	}
512
513 restarted:
514	while (1) {
515		/*
516		 * Execute any pending callback.
517		 */
518		if (RPC_DO_CALLBACK(task)) {
519			/* Define a callback save pointer */
520			void (*save_callback)(struct rpc_task *);
521
522			/*
523			 * If a callback exists, save it, reset it,
524			 * call it.
525			 * The save is needed to stop from resetting
526			 * another callback set within the callback handler
527			 * - Dave
528			 */
529			save_callback=task->tk_callback;
530			task->tk_callback=NULL;
531			save_callback(task);
532		}
533
534		/*
535		 * Perform the next FSM step.
536		 * tk_action may be NULL when the task has been killed
537		 * by someone else.
538		 */
539		if (RPC_IS_RUNNING(task)) {
540			/*
541			 * Garbage collection of pending timers...
542			 */
543			rpc_delete_timer(task);
544			if (!task->tk_action)
545				break;
546			task->tk_action(task);
547		}
548
549		/*
550		 * Check whether task is sleeping.
551		 */
552		spin_lock_bh(&rpc_queue_lock);
553		if (!RPC_IS_RUNNING(task)) {
554			rpc_set_sleeping(task);
555			if (RPC_IS_ASYNC(task)) {
556				spin_unlock_bh(&rpc_queue_lock);
557				return 0;
558			}
559		}
560		spin_unlock_bh(&rpc_queue_lock);
561
562		while (RPC_IS_SLEEPING(task)) {
563			/* sync task: sleep here */
564			dprintk("RPC: %4d sync task going to sleep\n",
565							task->tk_pid);
566			if (current->pid == rpciod_pid)
567				printk(KERN_ERR "RPC: rpciod waiting on sync task!\n");
568
569			__wait_event(task->tk_wait, !RPC_IS_SLEEPING(task));
570			dprintk("RPC: %4d sync task resuming\n", task->tk_pid);
571
572			/*
573			 * When a sync task receives a signal, it exits with
574			 * -ERESTARTSYS. In order to catch any callbacks that
575			 * clean up after sleeping on some queue, we don't
576			 * break the loop here, but go around once more.
577			 */
578			if (task->tk_client->cl_intr && signalled()) {
579				dprintk("RPC: %4d got signal\n", task->tk_pid);
580				task->tk_flags |= RPC_TASK_KILLED;
581				rpc_exit(task, -ERESTARTSYS);
582				rpc_wake_up_task(task);
583			}
584		}
585	}
586
587	if (task->tk_exit) {
588		task->tk_exit(task);
589		/* If tk_action is non-null, the user wants us to restart */
590		if (task->tk_action) {
591			if (!RPC_ASSASSINATED(task)) {
592				/* Release RPC slot and buffer memory */
593				if (task->tk_rqstp)
594					xprt_release(task);
595				if (task->tk_buffer) {
596					rpc_free(task->tk_buffer);
597					task->tk_buffer = NULL;
598				}
599				goto restarted;
600			}
601			printk(KERN_ERR "RPC: dead task tries to walk away.\n");
602		}
603	}
604
605	dprintk("RPC: %4d exit() = %d\n", task->tk_pid, task->tk_status);
606	status = task->tk_status;
607
608	/* Release all resources associated with the task */
609	rpc_release_task(task);
610
611	return status;
612}
613
614/*
615 * User-visible entry point to the scheduler.
616 *
617 * This may be called recursively if e.g. an async NFS task updates
618 * the attributes and finds that dirty pages must be flushed.
619 * NOTE: Upon exit of this function the task is guaranteed to be
620 *	 released. In particular note that tk_release() will have
621 *	 been called, so your task memory may have been freed.
622 */
623int
624rpc_execute(struct rpc_task *task)
625{
626	int status = -EIO;
627	if (rpc_inhibit) {
628		printk(KERN_INFO "RPC: execution inhibited!\n");
629		goto out_release;
630	}
631
632	status = -EWOULDBLOCK;
633	if (task->tk_active) {
634		printk(KERN_ERR "RPC: active task was run twice!\n");
635		goto out_err;
636	}
637
638	task->tk_active = 1;
639	rpc_set_running(task);
640	return __rpc_execute(task);
641 out_release:
642	rpc_release_task(task);
643 out_err:
644	return status;
645}
646
647/*
648 * This is our own little scheduler for async RPC tasks.
649 */
650static void
651__rpc_schedule(void)
652{
653	struct rpc_task	*task;
654	int		count = 0;
655
656	dprintk("RPC:      rpc_schedule enter\n");
657	while (1) {
658		spin_lock_bh(&rpc_queue_lock);
659
660		task_for_first(task, &schedq.tasks) {
661			__rpc_remove_wait_queue(task);
662			spin_unlock_bh(&rpc_queue_lock);
663
664			__rpc_execute(task);
665		} else {
666			spin_unlock_bh(&rpc_queue_lock);
667			break;
668		}
669
670		if (++count >= 200 || current->need_resched) {
671			count = 0;
672			schedule();
673		}
674	}
675	dprintk("RPC:      rpc_schedule leave\n");
676}
677
678/*
679 * Allocate memory for RPC purpose.
680 *
681 * This is yet another tricky issue: For sync requests issued by
682 * a user process, we want to make kmalloc sleep if there isn't
683 * enough memory. Async requests should not sleep too excessively
684 * because that will block rpciod (but that's not dramatic when
685 * it's starved of memory anyway). Finally, swapout requests should
686 * never sleep at all, and should not trigger another swap_out
687 * request through kmalloc which would just increase memory contention.
688 *
689 * I hope the following gets it right, which gives async requests
690 * a slight advantage over sync requests (good for writeback, debatable
691 * for readahead):
692 *
693 *   sync user requests:	GFP_KERNEL
694 *   async requests:		GFP_RPC		(== GFP_NOFS)
695 *   swap requests:		GFP_ATOMIC	(or new GFP_SWAPPER)
696 */
697void *
698rpc_allocate(unsigned int flags, unsigned int size)
699{
700	u32	*buffer;
701	int	gfp;
702
703	if (flags & RPC_TASK_SWAPPER)
704		gfp = GFP_ATOMIC;
705	else if (flags & RPC_TASK_ASYNC)
706		gfp = GFP_RPC;
707	else
708		gfp = GFP_KERNEL;
709
710	do {
711		if ((buffer = (u32 *) kmalloc(size, gfp)) != NULL) {
712			dprintk("RPC:      allocated buffer %p\n", buffer);
713			return buffer;
714		}
715		if ((flags & RPC_TASK_SWAPPER) && size <= sizeof(swap_buffer)
716		    && rpc_lock_swapbuf()) {
717			dprintk("RPC:      used last-ditch swap buffer\n");
718			return swap_buffer;
719		}
720		if (flags & RPC_TASK_ASYNC)
721			return NULL;
722		yield();
723	} while (!signalled());
724
725	return NULL;
726}
727
728void
729rpc_free(void *buffer)
730{
731	if (buffer != swap_buffer) {
732		kfree(buffer);
733		return;
734	}
735	rpc_unlock_swapbuf();
736}
737
738/*
739 * Creation and deletion of RPC task structures
740 */
741inline void
742rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt,
743				rpc_action callback, int flags)
744{
745	memset(task, 0, sizeof(*task));
746	init_timer(&task->tk_timer);
747	task->tk_timer.data     = (unsigned long) task;
748	task->tk_timer.function = (void (*)(unsigned long)) rpc_run_timer;
749	task->tk_client = clnt;
750	task->tk_flags  = flags;
751	task->tk_exit   = callback;
752	init_waitqueue_head(&task->tk_wait);
753	if (current->uid != current->fsuid || current->gid != current->fsgid)
754		task->tk_flags |= RPC_TASK_SETUID;
755
756	/* Initialize retry counters */
757	task->tk_garb_retry = 2;
758	task->tk_cred_retry = 2;
759	task->tk_suid_retry = 1;
760
761	/* Add to global list of all tasks */
762	spin_lock(&rpc_sched_lock);
763	list_add(&task->tk_task, &all_tasks);
764	spin_unlock(&rpc_sched_lock);
765
766	if (clnt)
767		atomic_inc(&clnt->cl_users);
768
769#ifdef RPC_DEBUG
770	task->tk_magic = 0xf00baa;
771	task->tk_pid = rpc_task_id++;
772#endif
773	dprintk("RPC: %4d new task procpid %d\n", task->tk_pid,
774				current->pid);
775}
776
777static void
778rpc_default_free_task(struct rpc_task *task)
779{
780	dprintk("RPC: %4d freeing task\n", task->tk_pid);
781	rpc_free(task);
782}
783
784/*
785 * Create a new task for the specified client.  We have to
786 * clean up after an allocation failure, as the client may
787 * have specified "oneshot".
788 */
789struct rpc_task *
790rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags)
791{
792	struct rpc_task	*task;
793
794	task = (struct rpc_task *) rpc_allocate(flags, sizeof(*task));
795	if (!task)
796		goto cleanup;
797
798	rpc_init_task(task, clnt, callback, flags);
799
800	/* Replace tk_release */
801	task->tk_release = rpc_default_free_task;
802
803	dprintk("RPC: %4d allocated task\n", task->tk_pid);
804	task->tk_flags |= RPC_TASK_DYNAMIC;
805out:
806	return task;
807
808cleanup:
809	/* Check whether to release the client */
810	if (clnt) {
811		printk("rpc_new_task: failed, users=%d, oneshot=%d\n",
812			atomic_read(&clnt->cl_users), clnt->cl_oneshot);
813		atomic_inc(&clnt->cl_users); /* pretend we were used ... */
814		rpc_release_client(clnt);
815	}
816	goto out;
817}
818
819void
820rpc_release_task(struct rpc_task *task)
821{
822	dprintk("RPC: %4d release task\n", task->tk_pid);
823
824#ifdef RPC_DEBUG
825	if (task->tk_magic != 0xf00baa) {
826		printk(KERN_ERR "RPC: attempt to release a non-existing task!\n");
827		rpc_debug = ~0;
828		rpc_show_tasks();
829		return;
830	}
831#endif
832
833	/* Remove from global task list */
834	spin_lock(&rpc_sched_lock);
835	list_del(&task->tk_task);
836	spin_unlock(&rpc_sched_lock);
837
838	/* Protect the execution below. */
839	spin_lock_bh(&rpc_queue_lock);
840
841	/* Disable timer to prevent zombie wakeup */
842	__rpc_disable_timer(task);
843
844	/* Remove from any wait queue we're still on */
845	__rpc_remove_wait_queue(task);
846
847	task->tk_active = 0;
848
849	spin_unlock_bh(&rpc_queue_lock);
850
851	/* Synchronously delete any running timer */
852	rpc_delete_timer(task);
853
854	/* Release resources */
855	if (task->tk_rqstp)
856		xprt_release(task);
857	if (task->tk_msg.rpc_cred)
858		rpcauth_unbindcred(task);
859	if (task->tk_buffer) {
860		rpc_free(task->tk_buffer);
861		task->tk_buffer = NULL;
862	}
863	if (task->tk_client) {
864		rpc_release_client(task->tk_client);
865		task->tk_client = NULL;
866	}
867
868#ifdef RPC_DEBUG
869	task->tk_magic = 0;
870#endif
871	if (task->tk_release)
872		task->tk_release(task);
873}
874
875/**
876 * rpc_find_parent - find the parent of a child task.
877 * @child: child task
878 *
879 * Checks that the parent task is still sleeping on the
880 * queue 'childq'. If so returns a pointer to the parent.
881 * Upon failure returns NULL.
882 *
883 * Caller must hold rpc_queue_lock
884 */
885static inline struct rpc_task *
886rpc_find_parent(struct rpc_task *child)
887{
888	struct rpc_task	*task, *parent;
889	struct list_head *le;
890
891	parent = (struct rpc_task *) child->tk_calldata;
892	task_for_each(task, le, &childq.tasks)
893		if (task == parent)
894			return parent;
895
896	return NULL;
897}
898
899static void
900rpc_child_exit(struct rpc_task *child)
901{
902	struct rpc_task	*parent;
903
904	spin_lock_bh(&rpc_queue_lock);
905	if ((parent = rpc_find_parent(child)) != NULL) {
906		parent->tk_status = child->tk_status;
907		__rpc_wake_up_task(parent);
908	}
909	spin_unlock_bh(&rpc_queue_lock);
910}
911
912/*
913 * Note: rpc_new_task releases the client after a failure.
914 */
915struct rpc_task *
916rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent)
917{
918	struct rpc_task	*task;
919
920	task = rpc_new_task(clnt, NULL, RPC_TASK_ASYNC | RPC_TASK_CHILD);
921	if (!task)
922		goto fail;
923	task->tk_exit = rpc_child_exit;
924	task->tk_calldata = parent;
925	return task;
926
927fail:
928	parent->tk_status = -ENOMEM;
929	return NULL;
930}
931
932void
933rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
934{
935	spin_lock_bh(&rpc_queue_lock);
936	/* N.B. Is it possible for the child to have already finished? */
937	__rpc_sleep_on(&childq, task, func, NULL);
938	rpc_schedule_run(child);
939	spin_unlock_bh(&rpc_queue_lock);
940}
941
942void
943rpc_killall_tasks(struct rpc_clnt *clnt)
944{
945	struct rpc_task	*rovr;
946	struct list_head *le;
947
948	dprintk("RPC:      killing all tasks for client %p\n", clnt);
949
950	/*
951	 * Spin lock all_tasks to prevent changes...
952	 */
953	spin_lock(&rpc_sched_lock);
954	alltask_for_each(rovr, le, &all_tasks)
955		if (!clnt || rovr->tk_client == clnt) {
956			rovr->tk_flags |= RPC_TASK_KILLED;
957			rpc_exit(rovr, -EIO);
958			rpc_wake_up_task(rovr);
959		}
960	spin_unlock(&rpc_sched_lock);
961}
962
963static DECLARE_MUTEX_LOCKED(rpciod_running);
964
965static inline int
966rpciod_task_pending(void)
967{
968	return !list_empty(&schedq.tasks);
969}
970
971
972/*
973 * This is the rpciod kernel thread
974 */
975static int
976rpciod(void *ptr)
977{
978	wait_queue_head_t *assassin = (wait_queue_head_t*) ptr;
979	int		rounds = 0;
980
981	MOD_INC_USE_COUNT;
982	lock_kernel();
983	/*
984	 * Let our maker know we're running ...
985	 */
986	rpciod_pid = current->pid;
987	up(&rpciod_running);
988
989	daemonize();
990
991	spin_lock_irq(&current->sigmask_lock);
992	siginitsetinv(&current->blocked, sigmask(SIGKILL));
993	recalc_sigpending(current);
994	spin_unlock_irq(&current->sigmask_lock);
995
996	strcpy(current->comm, "rpciod");
997
998	dprintk("RPC: rpciod starting (pid %d)\n", rpciod_pid);
999	while (rpciod_users) {
1000		if (signalled()) {
1001			rpciod_killall();
1002			flush_signals(current);
1003		}
1004		__rpc_schedule();
1005
1006		if (++rounds >= 64) {	/* safeguard */
1007			schedule();
1008			rounds = 0;
1009		}
1010
1011		if (!rpciod_task_pending()) {
1012			dprintk("RPC: rpciod back to sleep\n");
1013			wait_event_interruptible(rpciod_idle, rpciod_task_pending());
1014			dprintk("RPC: switch to rpciod\n");
1015			rounds = 0;
1016		}
1017	}
1018
1019	dprintk("RPC: rpciod shutdown commences\n");
1020	if (!list_empty(&all_tasks)) {
1021		printk(KERN_ERR "rpciod: active tasks at shutdown?!\n");
1022		rpciod_killall();
1023	}
1024
1025	rpciod_pid = 0;
1026	wake_up(assassin);
1027
1028	dprintk("RPC: rpciod exiting\n");
1029	MOD_DEC_USE_COUNT;
1030	return 0;
1031}
1032
1033static void
1034rpciod_killall(void)
1035{
1036	unsigned long flags;
1037
1038	while (!list_empty(&all_tasks)) {
1039		current->sigpending = 0;
1040		rpc_killall_tasks(NULL);
1041		__rpc_schedule();
1042		if (!list_empty(&all_tasks)) {
1043			dprintk("rpciod_killall: waiting for tasks to exit\n");
1044			yield();
1045		}
1046	}
1047
1048	spin_lock_irqsave(&current->sigmask_lock, flags);
1049	recalc_sigpending(current);
1050	spin_unlock_irqrestore(&current->sigmask_lock, flags);
1051}
1052
1053/*
1054 * Start up the rpciod process if it's not already running.
1055 */
1056int
1057rpciod_up(void)
1058{
1059	int error = 0;
1060
1061	MOD_INC_USE_COUNT;
1062	down(&rpciod_sema);
1063	dprintk("rpciod_up: pid %d, users %d\n", rpciod_pid, rpciod_users);
1064	rpciod_users++;
1065	if (rpciod_pid)
1066		goto out;
1067	/*
1068	 * If there's no pid, we should be the first user.
1069	 */
1070	if (rpciod_users > 1)
1071		printk(KERN_WARNING "rpciod_up: no pid, %d users??\n", rpciod_users);
1072	/*
1073	 * Create the rpciod thread and wait for it to start.
1074	 */
1075	error = kernel_thread(rpciod, &rpciod_killer, 0);
1076	if (error < 0) {
1077		printk(KERN_WARNING "rpciod_up: create thread failed, error=%d\n", error);
1078		rpciod_users--;
1079		goto out;
1080	}
1081	down(&rpciod_running);
1082	error = 0;
1083out:
1084	up(&rpciod_sema);
1085	MOD_DEC_USE_COUNT;
1086	return error;
1087}
1088
1089void
1090rpciod_down(void)
1091{
1092	unsigned long flags;
1093
1094	MOD_INC_USE_COUNT;
1095	down(&rpciod_sema);
1096	dprintk("rpciod_down pid %d sema %d\n", rpciod_pid, rpciod_users);
1097	if (rpciod_users) {
1098		if (--rpciod_users)
1099			goto out;
1100	} else
1101		printk(KERN_WARNING "rpciod_down: pid=%d, no users??\n", rpciod_pid);
1102
1103	if (!rpciod_pid) {
1104		dprintk("rpciod_down: Nothing to do!\n");
1105		goto out;
1106	}
1107
1108	kill_proc(rpciod_pid, SIGKILL, 1);
1109	/*
1110	 * Usually rpciod will exit very quickly, so we
1111	 * wait briefly before checking the process id.
1112	 */
1113	current->sigpending = 0;
1114	yield();
1115	/*
1116	 * Display a message if we're going to wait longer.
1117	 */
1118	while (rpciod_pid) {
1119		dprintk("rpciod_down: waiting for pid %d to exit\n", rpciod_pid);
1120		if (signalled()) {
1121			dprintk("rpciod_down: caught signal\n");
1122			break;
1123		}
1124		interruptible_sleep_on(&rpciod_killer);
1125	}
1126	spin_lock_irqsave(&current->sigmask_lock, flags);
1127	recalc_sigpending(current);
1128	spin_unlock_irqrestore(&current->sigmask_lock, flags);
1129out:
1130	up(&rpciod_sema);
1131	MOD_DEC_USE_COUNT;
1132}
1133
1134#ifdef RPC_DEBUG
1135void rpc_show_tasks(void)
1136{
1137	struct list_head *le;
1138	struct rpc_task *t;
1139
1140	spin_lock(&rpc_sched_lock);
1141	if (list_empty(&all_tasks)) {
1142		spin_unlock(&rpc_sched_lock);
1143		return;
1144	}
1145	printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
1146		"-rpcwait -action- --exit--\n");
1147	alltask_for_each(t, le, &all_tasks)
1148		printk("%05d %04d %04x %06d %8p %6d %8p %08ld %8s %8p %8p\n",
1149			t->tk_pid, t->tk_msg.rpc_proc, t->tk_flags, t->tk_status,
1150			t->tk_client, t->tk_client->cl_prog,
1151			t->tk_rqstp, t->tk_timeout,
1152			t->tk_rpcwait ? rpc_qname(t->tk_rpcwait) : " <NULL> ",
1153			t->tk_action, t->tk_exit);
1154	spin_unlock(&rpc_sched_lock);
1155}
1156#endif
1157