1/*
2 * Copyright (c) 1993-1995, 1999-2008 Apple Inc. All rights reserved.
3 *
4 * @APPLE_OSREFERENCE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. The rights granted to you under the License
10 * may not be used to create, or enable the creation or redistribution of,
11 * unlawful or unlicensed copies of an Apple operating system, or to
12 * circumvent, violate, or enable the circumvention or violation of, any
13 * terms of an Apple operating system software license agreement.
14 *
15 * Please obtain a copy of the License at
16 * http://www.opensource.apple.com/apsl/ and read it before using this file.
17 *
18 * The Original Code and all software distributed under the License are
19 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
20 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
21 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
23 * Please see the License for the specific language governing rights and
24 * limitations under the License.
25 *
26 * @APPLE_OSREFERENCE_LICENSE_HEADER_END@
27 */
28
29#include <mach/mach_types.h>
30#include <mach/thread_act.h>
31
32#include <kern/kern_types.h>
33#include <kern/zalloc.h>
34#include <kern/sched_prim.h>
35#include <kern/clock.h>
36#include <kern/task.h>
37#include <kern/thread.h>
38#include <kern/wait_queue.h>
39
40#include <vm/vm_pageout.h>
41
42#include <kern/thread_call.h>
43#include <kern/call_entry.h>
44#include <kern/timer_call.h>
45
46#include <libkern/OSAtomic.h>
47
48#include <sys/kdebug.h>
49#if CONFIG_DTRACE
50#include <mach/sdt.h>
51#endif
52
53static zone_t			thread_call_zone;
54static struct wait_queue	daemon_wqueue;
55
56struct thread_call_group {
57	queue_head_t		pending_queue;
58	uint32_t		pending_count;
59
60	queue_head_t		delayed_queue;
61	uint32_t		delayed_count;
62
63	timer_call_data_t	delayed_timer;
64	timer_call_data_t	dealloc_timer;
65
66	struct wait_queue	idle_wqueue;
67	uint32_t		idle_count, active_count;
68
69	integer_t		pri;
70	uint32_t 		target_thread_count;
71	uint64_t		idle_timestamp;
72
73	uint32_t		flags;
74	sched_call_t		sched_call;
75};
76
77typedef struct thread_call_group	*thread_call_group_t;
78
79#define TCG_PARALLEL		0x01
80#define TCG_DEALLOC_ACTIVE	0x02
81
82#define THREAD_CALL_GROUP_COUNT		4
83#define THREAD_CALL_THREAD_MIN		4
84#define INTERNAL_CALL_COUNT		768
85#define THREAD_CALL_DEALLOC_INTERVAL_NS (5 * 1000 * 1000) /* 5 ms */
86#define THREAD_CALL_ADD_RATIO		4
87#define THREAD_CALL_MACH_FACTOR_CAP	3
88
89static struct thread_call_group	thread_call_groups[THREAD_CALL_GROUP_COUNT];
90static boolean_t		thread_call_daemon_awake;
91static thread_call_data_t	internal_call_storage[INTERNAL_CALL_COUNT];
92static queue_head_t		thread_call_internal_queue;
93static uint64_t 		thread_call_dealloc_interval_abs;
94
95static __inline__ thread_call_t	_internal_call_allocate(void);
96static __inline__ void		_internal_call_release(thread_call_t call);
97static __inline__ boolean_t	_pending_call_enqueue(thread_call_t call, thread_call_group_t group);
98static __inline__ boolean_t 	_delayed_call_enqueue(thread_call_t call, thread_call_group_t group, uint64_t deadline);
99static __inline__ boolean_t 	_call_dequeue(thread_call_t call, thread_call_group_t group);
100static __inline__ void		thread_call_wake(thread_call_group_t group);
101static __inline__ void		_set_delayed_call_timer(thread_call_t call, thread_call_group_t	group);
102static boolean_t		_remove_from_pending_queue(thread_call_func_t func, thread_call_param_t	param0, boolean_t remove_all);
103static boolean_t 		_remove_from_delayed_queue(thread_call_func_t func, thread_call_param_t	param0, boolean_t remove_all);
104static void			thread_call_daemon(void *arg);
105static void			thread_call_thread(thread_call_group_t group, wait_result_t wres);
106extern void			thread_call_delayed_timer(timer_call_param_t p0, timer_call_param_t p1);
107static void			thread_call_dealloc_timer(timer_call_param_t p0, timer_call_param_t p1);
108static void			thread_call_group_setup(thread_call_group_t group, thread_call_priority_t pri, uint32_t target_thread_count, boolean_t parallel);
109static void			sched_call_thread(int type, thread_t thread);
110static void			thread_call_start_deallocate_timer(thread_call_group_t group);
111static void			thread_call_wait_locked(thread_call_t call);
112
113#define qe(x)		((queue_entry_t)(x))
114#define TC(x)		((thread_call_t)(x))
115
116
117lck_grp_t               thread_call_queues_lck_grp;
118lck_grp_t               thread_call_lck_grp;
119lck_attr_t              thread_call_lck_attr;
120lck_grp_attr_t          thread_call_lck_grp_attr;
121
122#if defined(__i386__) || defined(__x86_64__)
123lck_mtx_t		thread_call_lock_data;
124#else
125lck_spin_t		thread_call_lock_data;
126#endif
127
128
129#define thread_call_lock_spin()			\
130	lck_mtx_lock_spin_always(&thread_call_lock_data)
131
132#define thread_call_unlock()			\
133	lck_mtx_unlock_always(&thread_call_lock_data)
134
135
136static inline spl_t
137disable_ints_and_lock(void)
138{
139	spl_t s;
140
141	s = splsched();
142	thread_call_lock_spin();
143
144	return s;
145}
146
147static inline void
148enable_ints_and_unlock(void)
149{
150	thread_call_unlock();
151	(void)spllo();
152}
153
154
155static inline boolean_t
156group_isparallel(thread_call_group_t group)
157{
158	return ((group->flags & TCG_PARALLEL) != 0);
159}
160
161static boolean_t
162thread_call_group_should_add_thread(thread_call_group_t group)
163{
164	uint32_t thread_count;
165
166	if (!group_isparallel(group)) {
167		if (group->pending_count > 0 && group->active_count == 0) {
168			return TRUE;
169		}
170
171		return FALSE;
172	}
173
174	if (group->pending_count > 0) {
175		if (group->idle_count > 0) {
176			panic("Pending work, but threads are idle?");
177		}
178
179		thread_count = group->active_count;
180
181		/*
182		 * Add a thread if either there are no threads,
183		 * the group has fewer than its target number of
184		 * threads, or the amount of work is large relative
185		 * to the number of threads.  In the last case, pay attention
186		 * to the total load on the system, and back off if
187         * it's high.
188		 */
189		if ((thread_count == 0) ||
190			(thread_count < group->target_thread_count) ||
191			((group->pending_count > THREAD_CALL_ADD_RATIO * thread_count) &&
192			 (sched_mach_factor < THREAD_CALL_MACH_FACTOR_CAP))) {
193			return TRUE;
194		}
195	}
196
197	return FALSE;
198}
199
200static inline integer_t
201thread_call_priority_to_sched_pri(thread_call_priority_t pri)
202{
203	switch (pri) {
204	case THREAD_CALL_PRIORITY_HIGH:
205		return BASEPRI_PREEMPT;
206	case THREAD_CALL_PRIORITY_KERNEL:
207		return BASEPRI_KERNEL;
208	case THREAD_CALL_PRIORITY_USER:
209		return BASEPRI_DEFAULT;
210	case THREAD_CALL_PRIORITY_LOW:
211		return DEPRESSPRI;
212	default:
213		panic("Invalid priority.");
214	}
215
216	return 0;
217}
218
219/* Lock held */
220static inline thread_call_group_t
221thread_call_get_group(
222		thread_call_t call)
223{
224	thread_call_priority_t 	pri = call->tc_pri;
225
226	assert(pri == THREAD_CALL_PRIORITY_LOW ||
227			pri == THREAD_CALL_PRIORITY_USER ||
228			pri == THREAD_CALL_PRIORITY_KERNEL ||
229			pri == THREAD_CALL_PRIORITY_HIGH);
230
231	return &thread_call_groups[pri];
232}
233
234static void
235thread_call_group_setup(
236		thread_call_group_t 		group,
237		thread_call_priority_t		pri,
238		uint32_t			target_thread_count,
239		boolean_t			parallel)
240{
241	queue_init(&group->pending_queue);
242	queue_init(&group->delayed_queue);
243
244	timer_call_setup(&group->delayed_timer, thread_call_delayed_timer, group);
245	timer_call_setup(&group->dealloc_timer, thread_call_dealloc_timer, group);
246
247	wait_queue_init(&group->idle_wqueue, SYNC_POLICY_FIFO);
248
249	group->target_thread_count = target_thread_count;
250	group->pri = thread_call_priority_to_sched_pri(pri);
251
252	group->sched_call = sched_call_thread;
253	if (parallel) {
254		group->flags |= TCG_PARALLEL;
255		group->sched_call = NULL;
256	}
257}
258
259/*
260 * Simple wrapper for creating threads bound to
261 * thread call groups.
262 */
263static kern_return_t
264thread_call_thread_create(
265		thread_call_group_t             group)
266{
267	thread_t thread;
268	kern_return_t result;
269
270	result = kernel_thread_start_priority((thread_continue_t)thread_call_thread, group, group->pri, &thread);
271	if (result != KERN_SUCCESS) {
272		return result;
273	}
274
275	if (group->pri < BASEPRI_PREEMPT) {
276		/*
277		 * New style doesn't get to run to completion in
278		 * kernel if there are higher priority threads
279		 * available.
280		 */
281		thread_set_eager_preempt(thread);
282	}
283
284	thread_deallocate(thread);
285	return KERN_SUCCESS;
286}
287
288/*
289 *	thread_call_initialize:
290 *
291 *	Initialize this module, called
292 *	early during system initialization.
293 */
294void
295thread_call_initialize(void)
296{
297	thread_call_t			call;
298	kern_return_t			result;
299	thread_t			thread;
300	int				i;
301
302	i = sizeof (thread_call_data_t);
303	thread_call_zone = zinit(i, 4096 * i, 16 * i, "thread_call");
304	zone_change(thread_call_zone, Z_CALLERACCT, FALSE);
305	zone_change(thread_call_zone, Z_NOENCRYPT, TRUE);
306
307	lck_attr_setdefault(&thread_call_lck_attr);
308	lck_grp_attr_setdefault(&thread_call_lck_grp_attr);
309	lck_grp_init(&thread_call_queues_lck_grp, "thread_call_queues", &thread_call_lck_grp_attr);
310	lck_grp_init(&thread_call_lck_grp, "thread_call", &thread_call_lck_grp_attr);
311
312#if defined(__i386__) || defined(__x86_64__)
313        lck_mtx_init(&thread_call_lock_data, &thread_call_lck_grp, &thread_call_lck_attr);
314#else
315        lck_spin_init(&thread_call_lock_data, &thread_call_lck_grp, &thread_call_lck_attr);
316#endif
317
318	nanotime_to_absolutetime(0, THREAD_CALL_DEALLOC_INTERVAL_NS, &thread_call_dealloc_interval_abs);
319	wait_queue_init(&daemon_wqueue, SYNC_POLICY_FIFO);
320
321	thread_call_group_setup(&thread_call_groups[THREAD_CALL_PRIORITY_LOW], THREAD_CALL_PRIORITY_LOW, 0, TRUE);
322	thread_call_group_setup(&thread_call_groups[THREAD_CALL_PRIORITY_USER], THREAD_CALL_PRIORITY_USER, 0, TRUE);
323	thread_call_group_setup(&thread_call_groups[THREAD_CALL_PRIORITY_KERNEL], THREAD_CALL_PRIORITY_KERNEL, 1, TRUE);
324	thread_call_group_setup(&thread_call_groups[THREAD_CALL_PRIORITY_HIGH], THREAD_CALL_PRIORITY_HIGH, THREAD_CALL_THREAD_MIN, FALSE);
325
326	disable_ints_and_lock();
327
328	queue_init(&thread_call_internal_queue);
329	for (
330			call = internal_call_storage;
331			call < &internal_call_storage[INTERNAL_CALL_COUNT];
332			call++) {
333
334		enqueue_tail(&thread_call_internal_queue, qe(call));
335	}
336
337	thread_call_daemon_awake = TRUE;
338
339	enable_ints_and_unlock();
340
341	result = kernel_thread_start_priority((thread_continue_t)thread_call_daemon, NULL, BASEPRI_PREEMPT + 1, &thread);
342	if (result != KERN_SUCCESS)
343		panic("thread_call_initialize");
344
345	thread_deallocate(thread);
346}
347
348void
349thread_call_setup(
350	thread_call_t			call,
351	thread_call_func_t		func,
352	thread_call_param_t		param0)
353{
354	bzero(call, sizeof(*call));
355	call_entry_setup((call_entry_t)call, func, param0);
356	call->tc_pri = THREAD_CALL_PRIORITY_HIGH; /* Default priority */
357}
358
359/*
360 *	_internal_call_allocate:
361 *
362 *	Allocate an internal callout entry.
363 *
364 *	Called with thread_call_lock held.
365 */
366static __inline__ thread_call_t
367_internal_call_allocate(void)
368{
369    thread_call_t		call;
370
371    if (queue_empty(&thread_call_internal_queue))
372    	panic("_internal_call_allocate");
373
374    call = TC(dequeue_head(&thread_call_internal_queue));
375
376    return (call);
377}
378
379/*
380 *	_internal_call_release:
381 *
382 *	Release an internal callout entry which
383 *	is no longer pending (or delayed).
384 *
385 * 	Called with thread_call_lock held.
386 */
387static __inline__ void
388_internal_call_release(
389    thread_call_t		call)
390{
391    if (    call >= internal_call_storage						&&
392	   	    call < &internal_call_storage[INTERNAL_CALL_COUNT]		)
393		enqueue_head(&thread_call_internal_queue, qe(call));
394}
395
396/*
397 *	_pending_call_enqueue:
398 *
399 *	Place an entry at the end of the
400 *	pending queue, to be executed soon.
401 *
402 *	Returns TRUE if the entry was already
403 *	on a queue.
404 *
405 *	Called with thread_call_lock held.
406 */
407static __inline__ boolean_t
408_pending_call_enqueue(
409    thread_call_t		call,
410	thread_call_group_t	group)
411{
412	queue_head_t		*old_queue;
413
414	old_queue = call_entry_enqueue_tail(CE(call), &group->pending_queue);
415
416	if (old_queue == NULL) {
417		call->tc_submit_count++;
418	}
419
420	group->pending_count++;
421
422	thread_call_wake(group);
423
424	return (old_queue != NULL);
425}
426
427/*
428 *	_delayed_call_enqueue:
429 *
430 *	Place an entry on the delayed queue,
431 *	after existing entries with an earlier
432 * 	(or identical) deadline.
433 *
434 *	Returns TRUE if the entry was already
435 *	on a queue.
436 *
437 *	Called with thread_call_lock held.
438 */
439static __inline__ boolean_t
440_delayed_call_enqueue(
441    	thread_call_t		call,
442	thread_call_group_t	group,
443	uint64_t		deadline)
444{
445	queue_head_t		*old_queue;
446
447	old_queue = call_entry_enqueue_deadline(CE(call), &group->delayed_queue, deadline);
448
449	if (old_queue == &group->pending_queue)
450		group->pending_count--;
451	else if (old_queue == NULL)
452		call->tc_submit_count++;
453
454	return (old_queue != NULL);
455}
456
457/*
458 *	_call_dequeue:
459 *
460 *	Remove an entry from a queue.
461 *
462 *	Returns TRUE if the entry was on a queue.
463 *
464 *	Called with thread_call_lock held.
465 */
466static __inline__ boolean_t
467_call_dequeue(
468	thread_call_t		call,
469	thread_call_group_t	group)
470{
471	queue_head_t		*old_queue;
472
473	old_queue = call_entry_dequeue(CE(call));
474
475	if (old_queue != NULL) {
476		call->tc_finish_count++;
477		if (old_queue == &group->pending_queue)
478			group->pending_count--;
479	}
480
481	return (old_queue != NULL);
482}
483
484/*
485 *	_set_delayed_call_timer:
486 *
487 *	Reset the timer so that it
488 *	next expires when the entry is due.
489 *
490 *	Called with thread_call_lock held.
491 */
492static __inline__ void
493_set_delayed_call_timer(
494    thread_call_t		call,
495	thread_call_group_t	group)
496{
497    timer_call_enter(&group->delayed_timer, call->tc_call.deadline, 0);
498}
499
500/*
501 *	_remove_from_pending_queue:
502 *
503 *	Remove the first (or all) matching
504 *	entries	from the pending queue.
505 *
506 *	Returns	TRUE if any matching entries
507 *	were found.
508 *
509 *	Called with thread_call_lock held.
510 */
511static boolean_t
512_remove_from_pending_queue(
513    thread_call_func_t		func,
514    thread_call_param_t		param0,
515    boolean_t				remove_all)
516{
517	boolean_t				call_removed = FALSE;
518	thread_call_t			call;
519	thread_call_group_t		group = &thread_call_groups[THREAD_CALL_PRIORITY_HIGH];
520
521	call = TC(queue_first(&group->pending_queue));
522
523	while (!queue_end(&group->pending_queue, qe(call))) {
524		if (call->tc_call.func == func &&
525				call->tc_call.param0 == param0) {
526			thread_call_t	next = TC(queue_next(qe(call)));
527
528			_call_dequeue(call, group);
529
530			_internal_call_release(call);
531
532			call_removed = TRUE;
533			if (!remove_all)
534				break;
535
536			call = next;
537		}
538		else
539			call = TC(queue_next(qe(call)));
540	}
541
542	return (call_removed);
543}
544
545/*
546 *	_remove_from_delayed_queue:
547 *
548 *	Remove the first (or all) matching
549 *	entries	from the delayed queue.
550 *
551 *	Returns	TRUE if any matching entries
552 *	were found.
553 *
554 *	Called with thread_call_lock held.
555 */
556static boolean_t
557_remove_from_delayed_queue(
558    thread_call_func_t		func,
559    thread_call_param_t		param0,
560    boolean_t				remove_all)
561{
562	boolean_t			call_removed = FALSE;
563	thread_call_t			call;
564	thread_call_group_t		group = &thread_call_groups[THREAD_CALL_PRIORITY_HIGH];
565
566	call = TC(queue_first(&group->delayed_queue));
567
568	while (!queue_end(&group->delayed_queue, qe(call))) {
569		if (call->tc_call.func == func	&&
570				call->tc_call.param0 == param0) {
571			thread_call_t	next = TC(queue_next(qe(call)));
572
573			_call_dequeue(call, group);
574
575			_internal_call_release(call);
576
577			call_removed = TRUE;
578			if (!remove_all)
579				break;
580
581			call = next;
582		}
583		else
584			call = TC(queue_next(qe(call)));
585	}
586
587	return (call_removed);
588}
589
590#ifndef	__LP64__
591
592/*
593 *	thread_call_func:
594 *
595 *	Enqueue a function callout.
596 *
597 *	Guarantees { function, argument }
598 *	uniqueness if unique_call is TRUE.
599 */
600void
601thread_call_func(
602    thread_call_func_t		func,
603    thread_call_param_t		param,
604    boolean_t				unique_call)
605{
606	thread_call_t		call;
607	thread_call_group_t	group = &thread_call_groups[THREAD_CALL_PRIORITY_HIGH];
608	spl_t			s;
609
610	s = splsched();
611	thread_call_lock_spin();
612
613	call = TC(queue_first(&group->pending_queue));
614
615	while (unique_call && !queue_end(&group->pending_queue, qe(call))) {
616		if (call->tc_call.func == func && call->tc_call.param0 == param) {
617			break;
618		}
619
620		call = TC(queue_next(qe(call)));
621	}
622
623	if (!unique_call || queue_end(&group->pending_queue, qe(call))) {
624		call = _internal_call_allocate();
625		call->tc_call.func	= func;
626		call->tc_call.param0	= param;
627		call->tc_call.param1	= NULL;
628
629		_pending_call_enqueue(call, group);
630	}
631
632	thread_call_unlock();
633	splx(s);
634}
635
636#endif	/* __LP64__ */
637
638/*
639 *	thread_call_func_delayed:
640 *
641 *	Enqueue a function callout to
642 *	occur at the stated time.
643 */
644void
645thread_call_func_delayed(
646		thread_call_func_t		func,
647		thread_call_param_t		param,
648		uint64_t			deadline)
649{
650	thread_call_t		call;
651	thread_call_group_t	group = &thread_call_groups[THREAD_CALL_PRIORITY_HIGH];
652	spl_t			s;
653
654	s = splsched();
655	thread_call_lock_spin();
656
657	call = _internal_call_allocate();
658	call->tc_call.func	= func;
659	call->tc_call.param0	= param;
660	call->tc_call.param1	= 0;
661
662	_delayed_call_enqueue(call, group, deadline);
663
664	if (queue_first(&group->delayed_queue) == qe(call))
665		_set_delayed_call_timer(call, group);
666
667	thread_call_unlock();
668	splx(s);
669}
670
671/*
672 *	thread_call_func_cancel:
673 *
674 *	Dequeue a function callout.
675 *
676 *	Removes one (or all) { function, argument }
677 *	instance(s) from either (or both)
678 *	the pending and	the delayed queue,
679 *	in that order.
680 *
681 *	Returns TRUE if any calls were cancelled.
682 */
683boolean_t
684thread_call_func_cancel(
685		thread_call_func_t		func,
686		thread_call_param_t		param,
687		boolean_t			cancel_all)
688{
689	boolean_t	result;
690	spl_t		s;
691
692	s = splsched();
693	thread_call_lock_spin();
694
695	if (cancel_all)
696		result = _remove_from_pending_queue(func, param, cancel_all) |
697			_remove_from_delayed_queue(func, param, cancel_all);
698	else
699		result = _remove_from_pending_queue(func, param, cancel_all) ||
700			_remove_from_delayed_queue(func, param, cancel_all);
701
702	thread_call_unlock();
703	splx(s);
704
705	return (result);
706}
707
708/*
709 * Allocate a thread call with a given priority.  Importances
710 * other than THREAD_CALL_PRIORITY_HIGH will be run in threads
711 * with eager preemption enabled (i.e. may be aggressively preempted
712 * by higher-priority threads which are not in the normal "urgent" bands).
713 */
714thread_call_t
715thread_call_allocate_with_priority(
716		thread_call_func_t		func,
717		thread_call_param_t		param0,
718		thread_call_priority_t		pri)
719{
720	thread_call_t call;
721
722	if (pri > THREAD_CALL_PRIORITY_LOW) {
723		panic("Invalid pri: %d\n", pri);
724	}
725
726	call = thread_call_allocate(func, param0);
727	call->tc_pri = pri;
728
729	return call;
730}
731
732/*
733 *	thread_call_allocate:
734 *
735 *	Allocate a callout entry.
736 */
737thread_call_t
738thread_call_allocate(
739		thread_call_func_t		func,
740		thread_call_param_t		param0)
741{
742	thread_call_t	call = zalloc(thread_call_zone);
743
744	thread_call_setup(call, func, param0);
745	call->tc_refs = 1;
746	call->tc_flags = THREAD_CALL_ALLOC;
747
748	return (call);
749}
750
751/*
752 *	thread_call_free:
753 *
754 *	Release a callout.  If the callout is currently
755 *	executing, it will be freed when all invocations
756 *	finish.
757 */
758boolean_t
759thread_call_free(
760		thread_call_t		call)
761{
762	spl_t	s;
763	int32_t refs;
764
765	s = splsched();
766	thread_call_lock_spin();
767
768	if (call->tc_call.queue != NULL) {
769		thread_call_unlock();
770		splx(s);
771
772		return (FALSE);
773	}
774
775	refs = --call->tc_refs;
776	if (refs < 0) {
777		panic("Refcount negative: %d\n", refs);
778	}
779
780	thread_call_unlock();
781	splx(s);
782
783	if (refs == 0) {
784		zfree(thread_call_zone, call);
785	}
786
787	return (TRUE);
788}
789
790/*
791 *	thread_call_enter:
792 *
793 *	Enqueue a callout entry to occur "soon".
794 *
795 *	Returns TRUE if the call was
796 *	already on a queue.
797 */
798boolean_t
799thread_call_enter(
800		thread_call_t		call)
801{
802	boolean_t		result = TRUE;
803	thread_call_group_t	group;
804	spl_t			s;
805
806	group = thread_call_get_group(call);
807
808	s = splsched();
809	thread_call_lock_spin();
810
811	if (call->tc_call.queue != &group->pending_queue) {
812		result = _pending_call_enqueue(call, group);
813	}
814
815	call->tc_call.param1 = 0;
816
817	thread_call_unlock();
818	splx(s);
819
820	return (result);
821}
822
823boolean_t
824thread_call_enter1(
825		thread_call_t			call,
826		thread_call_param_t		param1)
827{
828	boolean_t		result = TRUE;
829	thread_call_group_t	group;
830	spl_t			s;
831
832	group = thread_call_get_group(call);
833
834	s = splsched();
835	thread_call_lock_spin();
836
837	if (call->tc_call.queue != &group->pending_queue) {
838		result = _pending_call_enqueue(call, group);
839	}
840
841	call->tc_call.param1 = param1;
842
843	thread_call_unlock();
844	splx(s);
845
846	return (result);
847}
848
849/*
850 *	thread_call_enter_delayed:
851 *
852 *	Enqueue a callout entry to occur
853 *	at the stated time.
854 *
855 *	Returns TRUE if the call was
856 *	already on a queue.
857 */
858boolean_t
859thread_call_enter_delayed(
860		thread_call_t		call,
861		uint64_t			deadline)
862{
863	boolean_t		result = TRUE;
864	thread_call_group_t	group;
865	spl_t			s;
866
867	group = thread_call_get_group(call);
868
869	s = splsched();
870	thread_call_lock_spin();
871
872	result = _delayed_call_enqueue(call, group, deadline);
873
874	if (queue_first(&group->delayed_queue) == qe(call))
875		_set_delayed_call_timer(call, group);
876
877	call->tc_call.param1 = 0;
878
879	thread_call_unlock();
880	splx(s);
881
882	return (result);
883}
884
885boolean_t
886thread_call_enter1_delayed(
887		thread_call_t			call,
888		thread_call_param_t		param1,
889		uint64_t			deadline)
890{
891	boolean_t		result = TRUE;
892	thread_call_group_t	group;
893	spl_t			s;
894	uint64_t		abstime;
895
896	group = thread_call_get_group(call);
897
898	s = splsched();
899	thread_call_lock_spin();
900	abstime =  mach_absolute_time();
901
902	result = _delayed_call_enqueue(call, group, deadline);
903
904	if (queue_first(&group->delayed_queue) == qe(call))
905		_set_delayed_call_timer(call, group);
906
907	call->tc_call.param1 = param1;
908
909	call->ttd = (deadline > abstime) ? (deadline - abstime) : 0;
910#if CONFIG_DTRACE
911	DTRACE_TMR4(thread_callout__create, thread_call_func_t, call->tc_call.func, 0, (call->ttd >> 32), (unsigned) (call->ttd & 0xFFFFFFFF));
912#endif
913	thread_call_unlock();
914	splx(s);
915
916	return (result);
917}
918
919/*
920 *	thread_call_cancel:
921 *
922 *	Dequeue a callout entry.
923 *
924 *	Returns TRUE if the call was
925 *	on a queue.
926 */
927boolean_t
928thread_call_cancel(
929		thread_call_t		call)
930{
931	boolean_t		result;
932	thread_call_group_t	group;
933	spl_t			s;
934
935	group = thread_call_get_group(call);
936
937	s = splsched();
938	thread_call_lock_spin();
939
940	result = _call_dequeue(call, group);
941
942	thread_call_unlock();
943	splx(s);
944#if CONFIG_DTRACE
945	DTRACE_TMR4(thread_callout__cancel, thread_call_func_t, call->tc_call.func, 0, (call->ttd >> 32), (unsigned) (call->ttd & 0xFFFFFFFF));
946#endif
947
948	return (result);
949}
950
951/*
952 * Cancel a thread call.  If it cannot be cancelled (i.e.
953 * is already in flight), waits for the most recent invocation
954 * to finish.  Note that if clients re-submit this thread call,
955 * it may still be pending or in flight when thread_call_cancel_wait
956 * returns, but all requests to execute this work item prior
957 * to the call to thread_call_cancel_wait will have finished.
958 */
959boolean_t
960thread_call_cancel_wait(
961		thread_call_t		call)
962{
963	boolean_t		result;
964	thread_call_group_t	group;
965
966	if ((call->tc_flags & THREAD_CALL_ALLOC) == 0) {
967		panic("%s: Can't wait on thread call whose storage I don't own.", __FUNCTION__);
968	}
969
970	group = thread_call_get_group(call);
971
972	(void) splsched();
973	thread_call_lock_spin();
974
975	result = _call_dequeue(call, group);
976	if (result == FALSE) {
977		thread_call_wait_locked(call);
978	}
979
980	thread_call_unlock();
981	(void) spllo();
982
983	return result;
984}
985
986
987#ifndef	__LP64__
988
989/*
990 *	thread_call_is_delayed:
991 *
992 *	Returns TRUE if the call is
993 *	currently on a delayed queue.
994 *
995 *	Optionally returns the expiration time.
996 */
997boolean_t
998thread_call_is_delayed(
999	thread_call_t		call,
1000	uint64_t			*deadline)
1001{
1002	boolean_t			result = FALSE;
1003	thread_call_group_t		group;
1004	spl_t				s;
1005
1006	group = thread_call_get_group(call);
1007
1008	s = splsched();
1009	thread_call_lock_spin();
1010
1011	if (call->tc_call.queue == &group->delayed_queue) {
1012		if (deadline != NULL)
1013			*deadline = call->tc_call.deadline;
1014		result = TRUE;
1015	}
1016
1017	thread_call_unlock();
1018	splx(s);
1019
1020	return (result);
1021}
1022
1023#endif	/* __LP64__ */
1024
1025/*
1026 *	thread_call_wake:
1027 *
1028 *	Wake a call thread to service
1029 *	pending call entries.  May wake
1030 *	the daemon thread in order to
1031 *	create additional call threads.
1032 *
1033 *	Called with thread_call_lock held.
1034 *
1035 *	For high-priority group, only does wakeup/creation if there are no threads
1036 *	running.
1037 */
1038static __inline__ void
1039thread_call_wake(
1040	thread_call_group_t		group)
1041{
1042	/*
1043	 * New behavior: use threads if you've got 'em.
1044	 * Traditional behavior: wake only if no threads running.
1045	 */
1046	if (group_isparallel(group) || group->active_count == 0) {
1047		if (wait_queue_wakeup_one(&group->idle_wqueue, NO_EVENT, THREAD_AWAKENED, -1) == KERN_SUCCESS) {
1048			group->idle_count--; group->active_count++;
1049
1050			if (group->idle_count == 0) {
1051				timer_call_cancel(&group->dealloc_timer);
1052				group->flags &= TCG_DEALLOC_ACTIVE;
1053			}
1054		} else {
1055			if (!thread_call_daemon_awake && thread_call_group_should_add_thread(group)) {
1056				thread_call_daemon_awake = TRUE;
1057				wait_queue_wakeup_one(&daemon_wqueue, NO_EVENT, THREAD_AWAKENED, -1);
1058			}
1059		}
1060	}
1061}
1062
1063/*
1064 *	sched_call_thread:
1065 *
1066 *	Call out invoked by the scheduler.  Used only for high-priority
1067 *	thread call group.
1068 */
1069static void
1070sched_call_thread(
1071		int				type,
1072		__unused	thread_t		thread)
1073{
1074	thread_call_group_t		group;
1075
1076	group = &thread_call_groups[THREAD_CALL_PRIORITY_HIGH]; /* XXX */
1077
1078	thread_call_lock_spin();
1079
1080	switch (type) {
1081
1082		case SCHED_CALL_BLOCK:
1083			--group->active_count;
1084			if (group->pending_count > 0)
1085				thread_call_wake(group);
1086			break;
1087
1088		case SCHED_CALL_UNBLOCK:
1089			group->active_count++;
1090			break;
1091	}
1092
1093	thread_call_unlock();
1094}
1095
1096/*
1097 * Interrupts disabled, lock held; returns the same way.
1098 * Only called on thread calls whose storage we own.  Wakes up
1099 * anyone who might be waiting on this work item and frees it
1100 * if the client has so requested.
1101 */
1102static void
1103thread_call_finish(thread_call_t call)
1104{
1105	boolean_t dowake = FALSE;
1106
1107	call->tc_finish_count++;
1108	call->tc_refs--;
1109
1110	if ((call->tc_flags & THREAD_CALL_WAIT) != 0) {
1111		dowake = TRUE;
1112		call->tc_flags &= ~THREAD_CALL_WAIT;
1113
1114		/*
1115		 * Dropping lock here because the sched call for the
1116		 * high-pri group can take the big lock from under
1117		 * a thread lock.
1118		 */
1119		thread_call_unlock();
1120		thread_wakeup((event_t)call);
1121		thread_call_lock_spin();
1122	}
1123
1124	if (call->tc_refs == 0) {
1125		if (dowake) {
1126			panic("Someone waiting on a thread call that is scheduled for free: %p\n", call->tc_call.func);
1127		}
1128
1129		enable_ints_and_unlock();
1130
1131		zfree(thread_call_zone, call);
1132
1133		(void)disable_ints_and_lock();
1134	}
1135
1136}
1137
1138/*
1139 *	thread_call_thread:
1140 */
1141static void
1142thread_call_thread(
1143		thread_call_group_t		group,
1144		wait_result_t			wres)
1145{
1146	thread_t	self = current_thread();
1147	boolean_t	canwait;
1148
1149	if ((thread_get_tag_internal(self) & THREAD_TAG_CALLOUT) == 0)
1150		(void)thread_set_tag_internal(self, THREAD_TAG_CALLOUT);
1151
1152	/*
1153	 * A wakeup with THREAD_INTERRUPTED indicates that
1154	 * we should terminate.
1155	 */
1156	if (wres == THREAD_INTERRUPTED) {
1157		thread_terminate(self);
1158
1159		/* NOTREACHED */
1160		panic("thread_terminate() returned?");
1161	}
1162
1163	(void)disable_ints_and_lock();
1164
1165	thread_sched_call(self, group->sched_call);
1166
1167	while (group->pending_count > 0) {
1168		thread_call_t			call;
1169		thread_call_func_t		func;
1170		thread_call_param_t		param0, param1;
1171
1172		call = TC(dequeue_head(&group->pending_queue));
1173		group->pending_count--;
1174
1175		func = call->tc_call.func;
1176		param0 = call->tc_call.param0;
1177		param1 = call->tc_call.param1;
1178
1179		call->tc_call.queue = NULL;
1180
1181		_internal_call_release(call);
1182
1183		/*
1184		 * Can only do wakeups for thread calls whose storage
1185		 * we control.
1186		 */
1187		if ((call->tc_flags & THREAD_CALL_ALLOC) != 0) {
1188			canwait = TRUE;
1189			call->tc_refs++;	/* Delay free until we're done */
1190		} else
1191			canwait = FALSE;
1192
1193		enable_ints_and_unlock();
1194
1195		KERNEL_DEBUG_CONSTANT(
1196				MACHDBG_CODE(DBG_MACH_SCHED,MACH_CALLOUT) | DBG_FUNC_NONE,
1197				VM_KERNEL_UNSLIDE(func), param0, param1, 0, 0);
1198
1199		(*func)(param0, param1);
1200
1201		if (get_preemption_level() != 0) {
1202			int pl = get_preemption_level();
1203			panic("thread_call_thread: preemption_level %d, last callout %p(%p, %p)",
1204					pl, (void *)VM_KERNEL_UNSLIDE(func), param0, param1);
1205		}
1206
1207		(void)thread_funnel_set(self->funnel_lock, FALSE);		/* XXX */
1208
1209		(void) disable_ints_and_lock();
1210
1211		if (canwait) {
1212			/* Frees if so desired */
1213			thread_call_finish(call);
1214		}
1215	}
1216
1217	thread_sched_call(self, NULL);
1218	group->active_count--;
1219
1220	if (group_isparallel(group)) {
1221		/*
1222		 * For new style of thread group, thread always blocks.
1223		 * If we have more than the target number of threads,
1224		 * and this is the first to block, and it isn't active
1225		 * already, set a timer for deallocating a thread if we
1226		 * continue to have a surplus.
1227		 */
1228		group->idle_count++;
1229
1230		if (group->idle_count == 1) {
1231			group->idle_timestamp = mach_absolute_time();
1232		}
1233
1234		if (((group->flags & TCG_DEALLOC_ACTIVE) == 0) &&
1235				((group->active_count + group->idle_count) > group->target_thread_count)) {
1236			group->flags |= TCG_DEALLOC_ACTIVE;
1237			thread_call_start_deallocate_timer(group);
1238		}
1239
1240		/* Wait for more work (or termination) */
1241		wres = wait_queue_assert_wait(&group->idle_wqueue, NO_EVENT, THREAD_INTERRUPTIBLE, 0);
1242		if (wres != THREAD_WAITING) {
1243			panic("kcall worker unable to assert wait?");
1244		}
1245
1246		enable_ints_and_unlock();
1247
1248		thread_block_parameter((thread_continue_t)thread_call_thread, group);
1249	} else {
1250		if (group->idle_count < group->target_thread_count) {
1251			group->idle_count++;
1252
1253			wait_queue_assert_wait(&group->idle_wqueue, NO_EVENT, THREAD_UNINT, 0); /* Interrupted means to exit */
1254
1255			enable_ints_and_unlock();
1256
1257			thread_block_parameter((thread_continue_t)thread_call_thread, group);
1258			/* NOTREACHED */
1259		}
1260	}
1261
1262	enable_ints_and_unlock();
1263
1264	thread_terminate(self);
1265	/* NOTREACHED */
1266}
1267
1268/*
1269 *	thread_call_daemon: walk list of groups, allocating
1270 *	threads if appropriate (as determined by
1271 *	thread_call_group_should_add_thread()).
1272 */
1273static void
1274thread_call_daemon_continue(__unused void *arg)
1275{
1276	int		i;
1277	kern_return_t	kr;
1278	thread_call_group_t group;
1279
1280	(void)disable_ints_and_lock();
1281
1282	/* Starting at zero happens to be high-priority first. */
1283	for (i = 0; i < THREAD_CALL_GROUP_COUNT; i++) {
1284		group = &thread_call_groups[i];
1285		while (thread_call_group_should_add_thread(group)) {
1286			group->active_count++;
1287
1288			enable_ints_and_unlock();
1289
1290			kr = thread_call_thread_create(group);
1291			if (kr != KERN_SUCCESS) {
1292				/*
1293				 * On failure, just pause for a moment and give up.
1294				 * We can try again later.
1295				 */
1296				delay(10000); /* 10 ms */
1297				(void)disable_ints_and_lock();
1298				goto out;
1299			}
1300
1301			(void)disable_ints_and_lock();
1302		}
1303	}
1304
1305out:
1306	thread_call_daemon_awake = FALSE;
1307	wait_queue_assert_wait(&daemon_wqueue, NO_EVENT, THREAD_UNINT, 0);
1308
1309	enable_ints_and_unlock();
1310
1311	thread_block_parameter((thread_continue_t)thread_call_daemon_continue, NULL);
1312	/* NOTREACHED */
1313}
1314
1315static void
1316thread_call_daemon(
1317		__unused void	 *arg)
1318{
1319	thread_t	self = current_thread();
1320
1321	self->options |= TH_OPT_VMPRIV;
1322	vm_page_free_reserve(2);	/* XXX */
1323
1324	thread_call_daemon_continue(NULL);
1325	/* NOTREACHED */
1326}
1327
1328/*
1329 * Schedule timer to deallocate a worker thread if we have a surplus
1330 * of threads (in excess of the group's target) and at least one thread
1331 * is idle the whole time.
1332 */
1333static void
1334thread_call_start_deallocate_timer(
1335		thread_call_group_t group)
1336{
1337        uint64_t deadline;
1338        boolean_t onqueue;
1339
1340	assert(group->idle_count > 0);
1341
1342        group->flags |= TCG_DEALLOC_ACTIVE;
1343        deadline = group->idle_timestamp + thread_call_dealloc_interval_abs;
1344        onqueue = timer_call_enter(&group->dealloc_timer, deadline, 0);
1345
1346        if (onqueue) {
1347                panic("Deallocate timer already active?");
1348        }
1349}
1350
1351void
1352thread_call_delayed_timer(
1353		timer_call_param_t		p0,
1354		__unused timer_call_param_t	p1
1355)
1356{
1357	thread_call_t			call;
1358	thread_call_group_t		group = p0;
1359	uint64_t				timestamp;
1360
1361	thread_call_lock_spin();
1362
1363	timestamp = mach_absolute_time();
1364
1365	call = TC(queue_first(&group->delayed_queue));
1366
1367	while (!queue_end(&group->delayed_queue, qe(call))) {
1368		if (call->tc_call.deadline <= timestamp) {
1369			_pending_call_enqueue(call, group);
1370		}
1371		else
1372			break;
1373
1374		call = TC(queue_first(&group->delayed_queue));
1375	}
1376
1377	if (!queue_end(&group->delayed_queue, qe(call)))
1378		_set_delayed_call_timer(call, group);
1379
1380	thread_call_unlock();
1381}
1382
1383/*
1384 * Timer callback to tell a thread to terminate if
1385 * we have an excess of threads and at least one has been
1386 * idle for a long time.
1387 */
1388static void
1389thread_call_dealloc_timer(
1390		timer_call_param_t 		p0,
1391		__unused timer_call_param_t 	p1)
1392{
1393	thread_call_group_t group = (thread_call_group_t)p0;
1394	uint64_t now;
1395	kern_return_t res;
1396	boolean_t terminated = FALSE;
1397
1398	thread_call_lock_spin();
1399
1400	now = mach_absolute_time();
1401	if (group->idle_count > 0) {
1402		if (now > group->idle_timestamp + thread_call_dealloc_interval_abs) {
1403			terminated = TRUE;
1404			group->idle_count--;
1405			res = wait_queue_wakeup_one(&group->idle_wqueue, NO_EVENT, THREAD_INTERRUPTED, -1);
1406			if (res != KERN_SUCCESS) {
1407				panic("Unable to wake up idle thread for termination?");
1408			}
1409		}
1410
1411	}
1412
1413	/*
1414	 * If we still have an excess of threads, schedule another
1415	 * invocation of this function.
1416	 */
1417	if (group->idle_count > 0 && (group->idle_count + group->active_count > group->target_thread_count)) {
1418		/*
1419		 * If we killed someone just now, push out the
1420		 * next deadline.
1421		 */
1422		if (terminated) {
1423			group->idle_timestamp = now;
1424		}
1425
1426		thread_call_start_deallocate_timer(group);
1427	} else {
1428		group->flags &= ~TCG_DEALLOC_ACTIVE;
1429	}
1430
1431	thread_call_unlock();
1432}
1433
1434/*
1435 * Wait for all requested invocations of a thread call prior to now
1436 * to finish.  Can only be invoked on thread calls whose storage we manage.
1437 * Just waits for the finish count to catch up to the submit count we find
1438 * at the beginning of our wait.
1439 */
1440static void
1441thread_call_wait_locked(thread_call_t call)
1442{
1443	uint64_t submit_count;
1444	wait_result_t res;
1445
1446	assert(call->tc_flags & THREAD_CALL_ALLOC);
1447
1448	submit_count = call->tc_submit_count;
1449
1450	while (call->tc_finish_count < submit_count) {
1451		call->tc_flags |= THREAD_CALL_WAIT;
1452
1453		res = assert_wait(call, THREAD_UNINT);
1454		if (res != THREAD_WAITING) {
1455			panic("Unable to assert wait?");
1456		}
1457
1458		thread_call_unlock();
1459		(void) spllo();
1460
1461		res = thread_block(NULL);
1462		if (res != THREAD_AWAKENED) {
1463			panic("Awoken with %d?", res);
1464		}
1465
1466		(void) splsched();
1467		thread_call_lock_spin();
1468	}
1469}
1470
1471/*
1472 * Determine whether a thread call is either on a queue or
1473 * currently being executed.
1474 */
1475boolean_t
1476thread_call_isactive(thread_call_t call)
1477{
1478	boolean_t active;
1479
1480	disable_ints_and_lock();
1481	active = (call->tc_submit_count > call->tc_finish_count);
1482	enable_ints_and_unlock();
1483
1484	return active;
1485}
1486
1487