1/*	$NetBSD: task.c,v 1.1 2024/02/18 20:57:50 christos Exp $	*/
2
3/*
4 * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
5 *
6 * SPDX-License-Identifier: MPL-2.0
7 *
8 * This Source Code Form is subject to the terms of the Mozilla Public
9 * License, v. 2.0. If a copy of the MPL was not distributed with this
10 * file, you can obtain one at https://mozilla.org/MPL/2.0/.
11 *
12 * See the COPYRIGHT file distributed with this work for additional
13 * information regarding copyright ownership.
14 */
15
16/*! \file */
17
18/*
19 * XXXRTH  Need to document the states a task can be in, and the rules
20 * for changing states.
21 */
22
23#include <stdbool.h>
24#include <unistd.h>
25
26#include <isc/app.h>
27#include <isc/atomic.h>
28#include <isc/condition.h>
29#include <isc/event.h>
30#include <isc/log.h>
31#include <isc/magic.h>
32#include <isc/mem.h>
33#include <isc/once.h>
34#include <isc/platform.h>
35#include <isc/print.h>
36#include <isc/random.h>
37#include <isc/refcount.h>
38#include <isc/string.h>
39#include <isc/task.h>
40#include <isc/thread.h>
41#include <isc/time.h>
42#include <isc/util.h>
43
44#ifdef HAVE_LIBXML2
45#include <libxml/xmlwriter.h>
46#define ISC_XMLCHAR (const xmlChar *)
47#endif /* HAVE_LIBXML2 */
48
49#ifdef HAVE_JSON_C
50#include <json_object.h>
51#endif /* HAVE_JSON_C */
52
53#include "task_p.h"
54
55/*
56 * Task manager is built around 'as little locking as possible' concept.
57 * Each thread has his own queue of tasks to be run, if a task is in running
58 * state it will stay on the runner it's currently on, if a task is in idle
59 * state it can be woken up on a specific runner with isc_task_sendto - that
60 * helps with data locality on CPU.
61 *
62 * To make load even some tasks (from task pools) are bound to specific
63 * queues using isc_task_create_bound. This way load balancing between
64 * CPUs/queues happens on the higher layer.
65 */
66
67#ifdef ISC_TASK_TRACE
68#define XTRACE(m) \
69	fprintf(stderr, "task %p thread %zu: %s\n", task, isc_tid_v, (m))
70#define XTTRACE(t, m) \
71	fprintf(stderr, "task %p thread %zu: %s\n", (t), isc_tid_v, (m))
72#define XTHREADTRACE(m) fprintf(stderr, "thread %zu: %s\n", isc_tid_v, (m))
73#else /* ifdef ISC_TASK_TRACE */
74#define XTRACE(m)
75#define XTTRACE(t, m)
76#define XTHREADTRACE(m)
77#endif /* ifdef ISC_TASK_TRACE */
78
79/***
80 *** Types.
81 ***/
82
83typedef enum {
84	task_state_idle,    /* not doing anything, events queue empty */
85	task_state_ready,   /* waiting in worker's queue */
86	task_state_paused,  /* not running, paused */
87	task_state_pausing, /* running, waiting to be paused */
88	task_state_running, /* actively processing events */
89	task_state_done	    /* shutting down, no events or references */
90} task_state_t;
91
92#if defined(HAVE_LIBXML2) || defined(HAVE_JSON_C)
93static const char *statenames[] = {
94	"idle", "ready", "paused", "pausing", "running", "done",
95};
96#endif /* if defined(HAVE_LIBXML2) || defined(HAVE_JSON_C) */
97
98#define TASK_MAGIC    ISC_MAGIC('T', 'A', 'S', 'K')
99#define VALID_TASK(t) ISC_MAGIC_VALID(t, TASK_MAGIC)
100
101struct isc_task {
102	/* Not locked. */
103	unsigned int magic;
104	isc_taskmgr_t *manager;
105	isc_mutex_t lock;
106	/* Locked by task lock. */
107	int threadid;
108	task_state_t state;
109	int pause_cnt;
110	isc_refcount_t references;
111	isc_refcount_t running;
112	isc_eventlist_t events;
113	isc_eventlist_t on_shutdown;
114	unsigned int nevents;
115	unsigned int quantum;
116	isc_stdtime_t now;
117	isc_time_t tnow;
118	char name[16];
119	void *tag;
120	bool bound;
121	/* Protected by atomics */
122	atomic_bool shuttingdown;
123	atomic_bool privileged;
124	/* Locked by task manager lock. */
125	LINK(isc_task_t) link;
126};
127
128#define TASK_SHUTTINGDOWN(t) (atomic_load_acquire(&(t)->shuttingdown))
129#define TASK_PRIVILEGED(t)   (atomic_load_acquire(&(t)->privileged))
130
131#define TASK_MANAGER_MAGIC ISC_MAGIC('T', 'S', 'K', 'M')
132#define VALID_MANAGER(m)   ISC_MAGIC_VALID(m, TASK_MANAGER_MAGIC)
133
134struct isc_taskmgr {
135	/* Not locked. */
136	unsigned int magic;
137	isc_refcount_t references;
138	isc_mem_t *mctx;
139	isc_mutex_t lock;
140	atomic_uint_fast32_t tasks_count;
141	isc_nm_t *netmgr;
142
143	/* Locked by task manager lock. */
144	unsigned int default_quantum;
145	LIST(isc_task_t) tasks;
146	atomic_uint_fast32_t mode;
147	atomic_bool exclusive_req;
148	bool exiting;
149	isc_task_t *excl;
150};
151
152#define DEFAULT_DEFAULT_QUANTUM 25
153
154/*%
155 * The following are intended for internal use (indicated by "isc__"
156 * prefix) but are not declared as static, allowing direct access from
157 * unit tests etc.
158 */
159
160bool
161isc_task_purgeevent(isc_task_t *task, isc_event_t *event);
162void
163isc_taskmgr_setexcltask(isc_taskmgr_t *mgr, isc_task_t *task);
164isc_result_t
165isc_taskmgr_excltask(isc_taskmgr_t *mgr, isc_task_t **taskp);
166
167/***
168 *** Tasks.
169 ***/
170
171static void
172task_finished(isc_task_t *task) {
173	isc_taskmgr_t *manager = task->manager;
174	isc_mem_t *mctx = manager->mctx;
175	REQUIRE(EMPTY(task->events));
176	REQUIRE(task->nevents == 0);
177	REQUIRE(EMPTY(task->on_shutdown));
178	REQUIRE(task->state == task_state_done);
179
180	XTRACE("task_finished");
181
182	isc_refcount_destroy(&task->running);
183	isc_refcount_destroy(&task->references);
184
185	LOCK(&manager->lock);
186	UNLINK(manager->tasks, task, link);
187	atomic_fetch_sub(&manager->tasks_count, 1);
188	UNLOCK(&manager->lock);
189
190	isc_mutex_destroy(&task->lock);
191	task->magic = 0;
192	isc_mem_put(mctx, task, sizeof(*task));
193
194	isc_taskmgr_detach(&manager);
195}
196
197isc_result_t
198isc_task_create(isc_taskmgr_t *manager, unsigned int quantum,
199		isc_task_t **taskp) {
200	return (isc_task_create_bound(manager, quantum, taskp, -1));
201}
202
203isc_result_t
204isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum,
205		      isc_task_t **taskp, int threadid) {
206	isc_task_t *task = NULL;
207	bool exiting;
208
209	REQUIRE(VALID_MANAGER(manager));
210	REQUIRE(taskp != NULL && *taskp == NULL);
211
212	XTRACE("isc_task_create");
213
214	task = isc_mem_get(manager->mctx, sizeof(*task));
215	*task = (isc_task_t){ 0 };
216
217	isc_taskmgr_attach(manager, &task->manager);
218
219	if (threadid == -1) {
220		/*
221		 * Task is not pinned to a queue, it's threadid will be
222		 * chosen when first task will be sent to it - either
223		 * randomly or specified by isc_task_sendto.
224		 */
225		task->bound = false;
226		task->threadid = -1;
227	} else {
228		/*
229		 * Task is pinned to a queue, it'll always be run
230		 * by a specific thread.
231		 */
232		task->bound = true;
233		task->threadid = threadid;
234	}
235
236	isc_mutex_init(&task->lock);
237	task->state = task_state_idle;
238	task->pause_cnt = 0;
239
240	isc_refcount_init(&task->references, 1);
241	isc_refcount_init(&task->running, 0);
242	INIT_LIST(task->events);
243	INIT_LIST(task->on_shutdown);
244	task->nevents = 0;
245	task->quantum = (quantum > 0) ? quantum : manager->default_quantum;
246	atomic_init(&task->shuttingdown, false);
247	atomic_init(&task->privileged, false);
248	task->now = 0;
249	isc_time_settoepoch(&task->tnow);
250	memset(task->name, 0, sizeof(task->name));
251	task->tag = NULL;
252	INIT_LINK(task, link);
253	task->magic = TASK_MAGIC;
254
255	LOCK(&manager->lock);
256	exiting = manager->exiting;
257	if (!exiting) {
258		APPEND(manager->tasks, task, link);
259		atomic_fetch_add(&manager->tasks_count, 1);
260	}
261	UNLOCK(&manager->lock);
262
263	if (exiting) {
264		isc_refcount_destroy(&task->running);
265		isc_refcount_decrement(&task->references);
266		isc_refcount_destroy(&task->references);
267		isc_mutex_destroy(&task->lock);
268		isc_taskmgr_detach(&task->manager);
269		isc_mem_put(manager->mctx, task, sizeof(*task));
270		return (ISC_R_SHUTTINGDOWN);
271	}
272
273	*taskp = task;
274
275	return (ISC_R_SUCCESS);
276}
277
278void
279isc_task_attach(isc_task_t *source, isc_task_t **targetp) {
280	/*
281	 * Attach *targetp to source.
282	 */
283
284	REQUIRE(VALID_TASK(source));
285	REQUIRE(targetp != NULL && *targetp == NULL);
286
287	XTTRACE(source, "isc_task_attach");
288
289	isc_refcount_increment(&source->references);
290
291	*targetp = source;
292}
293
294static bool
295task_shutdown(isc_task_t *task) {
296	bool was_idle = false;
297	isc_event_t *event, *prev;
298
299	/*
300	 * Caller must be holding the task's lock.
301	 */
302
303	XTRACE("task_shutdown");
304
305	if (atomic_compare_exchange_strong(&task->shuttingdown,
306					   &(bool){ false }, true))
307	{
308		XTRACE("shutting down");
309		if (task->state == task_state_idle) {
310			INSIST(EMPTY(task->events));
311			task->state = task_state_ready;
312			was_idle = true;
313		}
314		INSIST(task->state == task_state_ready ||
315		       task->state == task_state_paused ||
316		       task->state == task_state_pausing ||
317		       task->state == task_state_running);
318
319		/*
320		 * Note that we post shutdown events LIFO.
321		 */
322		for (event = TAIL(task->on_shutdown); event != NULL;
323		     event = prev)
324		{
325			prev = PREV(event, ev_link);
326			DEQUEUE(task->on_shutdown, event, ev_link);
327			ENQUEUE(task->events, event, ev_link);
328			task->nevents++;
329		}
330	}
331
332	return (was_idle);
333}
334
335/*
336 * Moves a task onto the appropriate run queue.
337 *
338 * Caller must NOT hold queue lock.
339 */
340static void
341task_ready(isc_task_t *task) {
342	isc_taskmgr_t *manager = task->manager;
343	REQUIRE(VALID_MANAGER(manager));
344
345	XTRACE("task_ready");
346
347	isc_refcount_increment0(&task->running);
348	LOCK(&task->lock);
349	isc_nm_task_enqueue(manager->netmgr, task, task->threadid);
350	UNLOCK(&task->lock);
351}
352
353void
354isc_task_ready(isc_task_t *task) {
355	task_ready(task);
356}
357
358static bool
359task_detach(isc_task_t *task) {
360	/*
361	 * Caller must be holding the task lock.
362	 */
363
364	XTRACE("detach");
365
366	if (isc_refcount_decrement(&task->references) == 1 &&
367	    task->state == task_state_idle)
368	{
369		INSIST(EMPTY(task->events));
370		/*
371		 * There are no references to this task, and no
372		 * pending events.  We could try to optimize and
373		 * either initiate shutdown or clean up the task,
374		 * depending on its state, but it's easier to just
375		 * make the task ready and allow run() or the event
376		 * loop to deal with shutting down and termination.
377		 */
378		task->state = task_state_ready;
379		return (true);
380	}
381
382	return (false);
383}
384
385void
386isc_task_detach(isc_task_t **taskp) {
387	isc_task_t *task;
388	bool was_idle;
389
390	/*
391	 * Detach *taskp from its task.
392	 */
393
394	REQUIRE(taskp != NULL);
395	task = *taskp;
396	REQUIRE(VALID_TASK(task));
397
398	XTRACE("isc_task_detach");
399
400	LOCK(&task->lock);
401	was_idle = task_detach(task);
402	UNLOCK(&task->lock);
403
404	if (was_idle) {
405		task_ready(task);
406	}
407
408	*taskp = NULL;
409}
410
411static bool
412task_send(isc_task_t *task, isc_event_t **eventp, int c) {
413	bool was_idle = false;
414	isc_event_t *event;
415
416	/*
417	 * Caller must be holding the task lock.
418	 */
419
420	REQUIRE(eventp != NULL);
421	event = *eventp;
422	*eventp = NULL;
423	REQUIRE(event != NULL);
424	REQUIRE(event->ev_type > 0);
425	REQUIRE(task->state != task_state_done);
426	REQUIRE(!ISC_LINK_LINKED(event, ev_ratelink));
427
428	XTRACE("task_send");
429
430	if (task->bound) {
431		c = task->threadid;
432	} else if (c < 0) {
433		c = -1;
434	}
435
436	if (task->state == task_state_idle) {
437		was_idle = true;
438		task->threadid = c;
439		INSIST(EMPTY(task->events));
440		task->state = task_state_ready;
441	}
442	INSIST(task->state == task_state_ready ||
443	       task->state == task_state_running ||
444	       task->state == task_state_paused ||
445	       task->state == task_state_pausing);
446	ENQUEUE(task->events, event, ev_link);
447	task->nevents++;
448
449	return (was_idle);
450}
451
452void
453isc_task_send(isc_task_t *task, isc_event_t **eventp) {
454	isc_task_sendto(task, eventp, -1);
455}
456
457void
458isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) {
459	isc_task_sendtoanddetach(taskp, eventp, -1);
460}
461
462void
463isc_task_sendto(isc_task_t *task, isc_event_t **eventp, int c) {
464	bool was_idle;
465
466	/*
467	 * Send '*event' to 'task'.
468	 */
469
470	REQUIRE(VALID_TASK(task));
471	XTRACE("isc_task_send");
472
473	/*
474	 * We're trying hard to hold locks for as short a time as possible.
475	 * We're also trying to hold as few locks as possible.  This is why
476	 * some processing is deferred until after the lock is released.
477	 */
478	LOCK(&task->lock);
479	was_idle = task_send(task, eventp, c);
480	UNLOCK(&task->lock);
481
482	if (was_idle) {
483		/*
484		 * We need to add this task to the ready queue.
485		 *
486		 * We've waited until now to do it because making a task
487		 * ready requires locking the manager.  If we tried to do
488		 * this while holding the task lock, we could deadlock.
489		 *
490		 * We've changed the state to ready, so no one else will
491		 * be trying to add this task to the ready queue.  The
492		 * only way to leave the ready state is by executing the
493		 * task.  It thus doesn't matter if events are added,
494		 * removed, or a shutdown is started in the interval
495		 * between the time we released the task lock, and the time
496		 * we add the task to the ready queue.
497		 */
498		task_ready(task);
499	}
500}
501
502void
503isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) {
504	bool idle1, idle2;
505	isc_task_t *task;
506
507	/*
508	 * Send '*event' to '*taskp' and then detach '*taskp' from its
509	 * task.
510	 */
511
512	REQUIRE(taskp != NULL);
513	task = *taskp;
514	REQUIRE(VALID_TASK(task));
515	XTRACE("isc_task_sendanddetach");
516
517	LOCK(&task->lock);
518	idle1 = task_send(task, eventp, c);
519	idle2 = task_detach(task);
520	UNLOCK(&task->lock);
521
522	/*
523	 * If idle1, then idle2 shouldn't be true as well since we're holding
524	 * the task lock, and thus the task cannot switch from ready back to
525	 * idle.
526	 */
527	INSIST(!(idle1 && idle2));
528
529	if (idle1 || idle2) {
530		task_ready(task);
531	}
532
533	*taskp = NULL;
534}
535
536#define PURGE_OK(event) (((event)->ev_attributes & ISC_EVENTATTR_NOPURGE) == 0)
537
538static unsigned int
539dequeue_events(isc_task_t *task, void *sender, isc_eventtype_t first,
540	       isc_eventtype_t last, void *tag, isc_eventlist_t *events,
541	       bool purging) {
542	isc_event_t *event, *next_event;
543	unsigned int count = 0;
544
545	REQUIRE(VALID_TASK(task));
546	REQUIRE(last >= first);
547
548	XTRACE("dequeue_events");
549
550	/*
551	 * Events matching 'sender', whose type is >= first and <= last, and
552	 * whose tag is 'tag' will be dequeued.  If 'purging', matching events
553	 * which are marked as unpurgable will not be dequeued.
554	 *
555	 * sender == NULL means "any sender", and tag == NULL means "any tag".
556	 */
557
558	LOCK(&task->lock);
559
560	for (event = HEAD(task->events); event != NULL; event = next_event) {
561		next_event = NEXT(event, ev_link);
562		if (event->ev_type >= first && event->ev_type <= last &&
563		    (sender == NULL || event->ev_sender == sender) &&
564		    (tag == NULL || event->ev_tag == tag) &&
565		    (!purging || PURGE_OK(event)))
566		{
567			DEQUEUE(task->events, event, ev_link);
568			task->nevents--;
569			ENQUEUE(*events, event, ev_link);
570			count++;
571		}
572	}
573
574	UNLOCK(&task->lock);
575
576	return (count);
577}
578
579unsigned int
580isc_task_purgerange(isc_task_t *task, void *sender, isc_eventtype_t first,
581		    isc_eventtype_t last, void *tag) {
582	unsigned int count;
583	isc_eventlist_t events;
584	isc_event_t *event, *next_event;
585	REQUIRE(VALID_TASK(task));
586
587	/*
588	 * Purge events from a task's event queue.
589	 */
590
591	XTRACE("isc_task_purgerange");
592
593	ISC_LIST_INIT(events);
594
595	count = dequeue_events(task, sender, first, last, tag, &events, true);
596
597	for (event = HEAD(events); event != NULL; event = next_event) {
598		next_event = NEXT(event, ev_link);
599		ISC_LIST_UNLINK(events, event, ev_link);
600		isc_event_free(&event);
601	}
602
603	/*
604	 * Note that purging never changes the state of the task.
605	 */
606
607	return (count);
608}
609
610unsigned int
611isc_task_purge(isc_task_t *task, void *sender, isc_eventtype_t type,
612	       void *tag) {
613	/*
614	 * Purge events from a task's event queue.
615	 */
616	REQUIRE(VALID_TASK(task));
617
618	XTRACE("isc_task_purge");
619
620	return (isc_task_purgerange(task, sender, type, type, tag));
621}
622
623bool
624isc_task_purgeevent(isc_task_t *task, isc_event_t *event) {
625	bool found = false;
626
627	/*
628	 * Purge 'event' from a task's event queue.
629	 */
630
631	REQUIRE(VALID_TASK(task));
632
633	/*
634	 * If 'event' is on the task's event queue, it will be purged,
635	 * unless it is marked as unpurgeable.  'event' does not have to be
636	 * on the task's event queue; in fact, it can even be an invalid
637	 * pointer.  Purging only occurs if the event is actually on the task's
638	 * event queue.
639	 *
640	 * Purging never changes the state of the task.
641	 */
642
643	LOCK(&task->lock);
644	if (ISC_LINK_LINKED(event, ev_link)) {
645		DEQUEUE(task->events, event, ev_link);
646		task->nevents--;
647		found = true;
648	}
649	UNLOCK(&task->lock);
650
651	if (!found) {
652		return (false);
653	}
654
655	isc_event_free(&event);
656
657	return (true);
658}
659
660unsigned int
661isc_task_unsendrange(isc_task_t *task, void *sender, isc_eventtype_t first,
662		     isc_eventtype_t last, void *tag, isc_eventlist_t *events) {
663	/*
664	 * Remove events from a task's event queue.
665	 */
666	REQUIRE(VALID_TASK(task));
667
668	XTRACE("isc_task_unsendrange");
669
670	return (dequeue_events(task, sender, first, last, tag, events, false));
671}
672
673unsigned int
674isc_task_unsend(isc_task_t *task, void *sender, isc_eventtype_t type, void *tag,
675		isc_eventlist_t *events) {
676	/*
677	 * Remove events from a task's event queue.
678	 */
679
680	XTRACE("isc_task_unsend");
681
682	return (dequeue_events(task, sender, type, type, tag, events, false));
683}
684
685isc_result_t
686isc_task_onshutdown(isc_task_t *task, isc_taskaction_t action, void *arg) {
687	bool disallowed = false;
688	isc_result_t result = ISC_R_SUCCESS;
689	isc_event_t *event;
690
691	/*
692	 * Send a shutdown event with action 'action' and argument 'arg' when
693	 * 'task' is shutdown.
694	 */
695
696	REQUIRE(VALID_TASK(task));
697	REQUIRE(action != NULL);
698
699	event = isc_event_allocate(task->manager->mctx, NULL,
700				   ISC_TASKEVENT_SHUTDOWN, action, arg,
701				   sizeof(*event));
702
703	if (TASK_SHUTTINGDOWN(task)) {
704		disallowed = true;
705		result = ISC_R_SHUTTINGDOWN;
706	} else {
707		LOCK(&task->lock);
708		ENQUEUE(task->on_shutdown, event, ev_link);
709		UNLOCK(&task->lock);
710	}
711
712	if (disallowed) {
713		isc_mem_put(task->manager->mctx, event, sizeof(*event));
714	}
715
716	return (result);
717}
718
719void
720isc_task_shutdown(isc_task_t *task) {
721	bool was_idle;
722
723	/*
724	 * Shutdown 'task'.
725	 */
726
727	REQUIRE(VALID_TASK(task));
728
729	LOCK(&task->lock);
730	was_idle = task_shutdown(task);
731	UNLOCK(&task->lock);
732
733	if (was_idle) {
734		task_ready(task);
735	}
736}
737
738void
739isc_task_destroy(isc_task_t **taskp) {
740	/*
741	 * Destroy '*taskp'.
742	 */
743
744	REQUIRE(taskp != NULL);
745
746	isc_task_shutdown(*taskp);
747	isc_task_detach(taskp);
748}
749
750void
751isc_task_setname(isc_task_t *task, const char *name, void *tag) {
752	/*
753	 * Name 'task'.
754	 */
755
756	REQUIRE(VALID_TASK(task));
757
758	LOCK(&task->lock);
759	strlcpy(task->name, name, sizeof(task->name));
760	task->tag = tag;
761	UNLOCK(&task->lock);
762}
763
764const char *
765isc_task_getname(isc_task_t *task) {
766	REQUIRE(VALID_TASK(task));
767
768	return (task->name);
769}
770
771void *
772isc_task_gettag(isc_task_t *task) {
773	REQUIRE(VALID_TASK(task));
774
775	return (task->tag);
776}
777
778void
779isc_task_getcurrenttime(isc_task_t *task, isc_stdtime_t *t) {
780	REQUIRE(VALID_TASK(task));
781	REQUIRE(t != NULL);
782
783	LOCK(&task->lock);
784	*t = task->now;
785	UNLOCK(&task->lock);
786}
787
788void
789isc_task_getcurrenttimex(isc_task_t *task, isc_time_t *t) {
790	REQUIRE(VALID_TASK(task));
791	REQUIRE(t != NULL);
792
793	LOCK(&task->lock);
794	*t = task->tnow;
795	UNLOCK(&task->lock);
796}
797
798isc_nm_t *
799isc_task_getnetmgr(isc_task_t *task) {
800	REQUIRE(VALID_TASK(task));
801
802	return (task->manager->netmgr);
803}
804
805void
806isc_task_setquantum(isc_task_t *task, unsigned int quantum) {
807	REQUIRE(VALID_TASK(task));
808
809	LOCK(&task->lock);
810	task->quantum = (quantum > 0) ? quantum
811				      : task->manager->default_quantum;
812	UNLOCK(&task->lock);
813}
814
815/***
816 *** Task Manager.
817 ***/
818
819static isc_result_t
820task_run(isc_task_t *task) {
821	unsigned int dispatch_count = 0;
822	bool finished = false;
823	isc_event_t *event = NULL;
824	isc_result_t result = ISC_R_SUCCESS;
825	uint32_t quantum;
826
827	REQUIRE(VALID_TASK(task));
828
829	LOCK(&task->lock);
830	quantum = task->quantum;
831
832	/*
833	 * It is possible because that we have a paused task in the queue - it
834	 * might have been paused in the meantime and we never hold both queue
835	 * and task lock to avoid deadlocks, just bail then.
836	 */
837	if (task->state != task_state_ready) {
838		goto done;
839	}
840
841	INSIST(task->state == task_state_ready);
842	task->state = task_state_running;
843	XTRACE("running");
844	XTRACE(task->name);
845	TIME_NOW(&task->tnow);
846	task->now = isc_time_seconds(&task->tnow);
847
848	while (true) {
849		if (!EMPTY(task->events)) {
850			event = HEAD(task->events);
851			DEQUEUE(task->events, event, ev_link);
852			task->nevents--;
853
854			/*
855			 * Execute the event action.
856			 */
857			XTRACE("execute action");
858			XTRACE(task->name);
859			if (event->ev_action != NULL) {
860				UNLOCK(&task->lock);
861				(event->ev_action)(task, event);
862				LOCK(&task->lock);
863			}
864			XTRACE("execution complete");
865			dispatch_count++;
866		}
867
868		if (isc_refcount_current(&task->references) == 0 &&
869		    EMPTY(task->events) && !TASK_SHUTTINGDOWN(task))
870		{
871			/*
872			 * There are no references and no pending events for
873			 * this task, which means it will not become runnable
874			 * again via an external action (such as sending an
875			 * event or detaching).
876			 *
877			 * We initiate shutdown to prevent it from becoming a
878			 * zombie.
879			 *
880			 * We do this here instead of in the "if
881			 * EMPTY(task->events)" block below because:
882			 *
883			 *	If we post no shutdown events, we want the task
884			 *	to finish.
885			 *
886			 *	If we did post shutdown events, will still want
887			 *	the task's quantum to be applied.
888			 */
889			INSIST(!task_shutdown(task));
890		}
891
892		if (EMPTY(task->events)) {
893			/*
894			 * Nothing else to do for this task right now.
895			 */
896			XTRACE("empty");
897			if (isc_refcount_current(&task->references) == 0 &&
898			    TASK_SHUTTINGDOWN(task))
899			{
900				/*
901				 * The task is done.
902				 */
903				XTRACE("done");
904				task->state = task_state_done;
905			} else {
906				if (task->state == task_state_running) {
907					XTRACE("idling");
908					task->state = task_state_idle;
909				} else if (task->state == task_state_pausing) {
910					XTRACE("pausing");
911					task->state = task_state_paused;
912				}
913			}
914			break;
915		} else if (task->state == task_state_pausing) {
916			/*
917			 * We got a pause request on this task, stop working on
918			 * it and switch the state to paused.
919			 */
920			XTRACE("pausing");
921			task->state = task_state_paused;
922			break;
923		} else if (dispatch_count >= quantum) {
924			/*
925			 * Our quantum has expired, but there is more work to be
926			 * done.  We'll requeue it to the ready queue later.
927			 *
928			 * We don't check quantum until dispatching at least one
929			 * event, so the minimum quantum is one.
930			 */
931			XTRACE("quantum");
932			task->state = task_state_ready;
933			result = ISC_R_QUOTA;
934			break;
935		}
936	}
937
938done:
939	if (isc_refcount_decrement(&task->running) == 1 &&
940	    task->state == task_state_done)
941	{
942		finished = true;
943	}
944	UNLOCK(&task->lock);
945
946	if (finished) {
947		task_finished(task);
948	}
949
950	return (result);
951}
952
953isc_result_t
954isc_task_run(isc_task_t *task) {
955	return (task_run(task));
956}
957
958static void
959manager_free(isc_taskmgr_t *manager) {
960	isc_refcount_destroy(&manager->references);
961	isc_nm_detach(&manager->netmgr);
962
963	isc_mutex_destroy(&manager->lock);
964	manager->magic = 0;
965	isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager));
966}
967
968void
969isc_taskmgr_attach(isc_taskmgr_t *source, isc_taskmgr_t **targetp) {
970	REQUIRE(VALID_MANAGER(source));
971	REQUIRE(targetp != NULL && *targetp == NULL);
972
973	isc_refcount_increment(&source->references);
974
975	*targetp = source;
976}
977
978void
979isc_taskmgr_detach(isc_taskmgr_t **managerp) {
980	REQUIRE(managerp != NULL);
981	REQUIRE(VALID_MANAGER(*managerp));
982
983	isc_taskmgr_t *manager = *managerp;
984	*managerp = NULL;
985
986	if (isc_refcount_decrement(&manager->references) == 1) {
987		manager_free(manager);
988	}
989}
990
991isc_result_t
992isc__taskmgr_create(isc_mem_t *mctx, unsigned int default_quantum, isc_nm_t *nm,
993		    isc_taskmgr_t **managerp) {
994	isc_taskmgr_t *manager;
995
996	/*
997	 * Create a new task manager.
998	 */
999
1000	REQUIRE(managerp != NULL && *managerp == NULL);
1001	REQUIRE(nm != NULL);
1002
1003	manager = isc_mem_get(mctx, sizeof(*manager));
1004	*manager = (isc_taskmgr_t){ .magic = TASK_MANAGER_MAGIC };
1005
1006	isc_mutex_init(&manager->lock);
1007
1008	if (default_quantum == 0) {
1009		default_quantum = DEFAULT_DEFAULT_QUANTUM;
1010	}
1011	manager->default_quantum = default_quantum;
1012
1013	if (nm != NULL) {
1014		isc_nm_attach(nm, &manager->netmgr);
1015	}
1016
1017	INIT_LIST(manager->tasks);
1018	atomic_init(&manager->mode, isc_taskmgrmode_normal);
1019	atomic_init(&manager->exclusive_req, false);
1020	atomic_init(&manager->tasks_count, 0);
1021
1022	isc_mem_attach(mctx, &manager->mctx);
1023
1024	isc_refcount_init(&manager->references, 1);
1025
1026	*managerp = manager;
1027
1028	return (ISC_R_SUCCESS);
1029}
1030
1031void
1032isc__taskmgr_shutdown(isc_taskmgr_t *manager) {
1033	isc_task_t *task;
1034
1035	REQUIRE(VALID_MANAGER(manager));
1036
1037	XTHREADTRACE("isc_taskmgr_shutdown");
1038	/*
1039	 * Only one non-worker thread may ever call this routine.
1040	 * If a worker thread wants to initiate shutdown of the
1041	 * task manager, it should ask some non-worker thread to call
1042	 * isc_taskmgr_destroy(), e.g. by signalling a condition variable
1043	 * that the startup thread is sleeping on.
1044	 */
1045
1046	/*
1047	 * Unlike elsewhere, we're going to hold this lock a long time.
1048	 * We need to do so, because otherwise the list of tasks could
1049	 * change while we were traversing it.
1050	 *
1051	 * This is also the only function where we will hold both the
1052	 * task manager lock and a task lock at the same time.
1053	 */
1054	LOCK(&manager->lock);
1055	if (manager->excl != NULL) {
1056		isc_task_detach((isc_task_t **)&manager->excl);
1057	}
1058
1059	/*
1060	 * Make sure we only get called once.
1061	 */
1062	INSIST(manager->exiting == false);
1063	manager->exiting = true;
1064
1065	/*
1066	 * Post shutdown event(s) to every task (if they haven't already been
1067	 * posted).
1068	 */
1069	for (task = HEAD(manager->tasks); task != NULL; task = NEXT(task, link))
1070	{
1071		bool was_idle;
1072
1073		LOCK(&task->lock);
1074		was_idle = task_shutdown(task);
1075		if (was_idle) {
1076			task->threadid = 0;
1077		}
1078		UNLOCK(&task->lock);
1079
1080		if (was_idle) {
1081			task_ready(task);
1082		}
1083	}
1084
1085	UNLOCK(&manager->lock);
1086}
1087
1088void
1089isc__taskmgr_destroy(isc_taskmgr_t **managerp) {
1090	REQUIRE(managerp != NULL && VALID_MANAGER(*managerp));
1091	XTHREADTRACE("isc_taskmgr_destroy");
1092
1093#ifdef ISC_TASK_TRACE
1094	int counter = 0;
1095	while (isc_refcount_current(&(*managerp)->references) > 1 &&
1096	       counter++ < 1000)
1097	{
1098		usleep(10 * 1000);
1099	}
1100	INSIST(counter < 1000);
1101#else
1102	while (isc_refcount_current(&(*managerp)->references) > 1) {
1103		usleep(10 * 1000);
1104	}
1105#endif
1106
1107	isc_taskmgr_detach(managerp);
1108}
1109
1110void
1111isc_taskmgr_setexcltask(isc_taskmgr_t *mgr, isc_task_t *task) {
1112	REQUIRE(VALID_MANAGER(mgr));
1113	REQUIRE(VALID_TASK(task));
1114
1115	LOCK(&task->lock);
1116	REQUIRE(task->threadid == 0);
1117	UNLOCK(&task->lock);
1118
1119	LOCK(&mgr->lock);
1120	if (mgr->excl != NULL) {
1121		isc_task_detach(&mgr->excl);
1122	}
1123	isc_task_attach(task, &mgr->excl);
1124	UNLOCK(&mgr->lock);
1125}
1126
1127isc_result_t
1128isc_taskmgr_excltask(isc_taskmgr_t *mgr, isc_task_t **taskp) {
1129	isc_result_t result;
1130
1131	REQUIRE(VALID_MANAGER(mgr));
1132	REQUIRE(taskp != NULL && *taskp == NULL);
1133
1134	LOCK(&mgr->lock);
1135	if (mgr->excl != NULL) {
1136		isc_task_attach(mgr->excl, taskp);
1137		result = ISC_R_SUCCESS;
1138	} else if (mgr->exiting) {
1139		result = ISC_R_SHUTTINGDOWN;
1140	} else {
1141		result = ISC_R_NOTFOUND;
1142	}
1143	UNLOCK(&mgr->lock);
1144
1145	return (result);
1146}
1147
1148isc_result_t
1149isc_task_beginexclusive(isc_task_t *task) {
1150	isc_taskmgr_t *manager;
1151
1152	REQUIRE(VALID_TASK(task));
1153
1154	manager = task->manager;
1155
1156	REQUIRE(task->state == task_state_running);
1157
1158	LOCK(&manager->lock);
1159	REQUIRE(task == manager->excl ||
1160		(manager->exiting && manager->excl == NULL));
1161	UNLOCK(&manager->lock);
1162
1163	if (!atomic_compare_exchange_strong(&manager->exclusive_req,
1164					    &(bool){ false }, true))
1165	{
1166		return (ISC_R_LOCKBUSY);
1167	}
1168
1169	if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) {
1170		isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
1171			      ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1),
1172			      "exclusive task mode: %s", "starting");
1173	}
1174
1175	isc_nm_pause(manager->netmgr);
1176
1177	if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) {
1178		isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
1179			      ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1),
1180			      "exclusive task mode: %s", "started");
1181	}
1182
1183	return (ISC_R_SUCCESS);
1184}
1185
1186void
1187isc_task_endexclusive(isc_task_t *task) {
1188	isc_taskmgr_t *manager;
1189
1190	REQUIRE(VALID_TASK(task));
1191	REQUIRE(task->state == task_state_running);
1192	manager = task->manager;
1193
1194	if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) {
1195		isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
1196			      ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1),
1197			      "exclusive task mode: %s", "ending");
1198	}
1199
1200	isc_nm_resume(manager->netmgr);
1201
1202	if (isc_log_wouldlog(isc_lctx, ISC_LOG_DEBUG(1))) {
1203		isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL,
1204			      ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1),
1205			      "exclusive task mode: %s", "ended");
1206	}
1207
1208	REQUIRE(atomic_compare_exchange_strong(&manager->exclusive_req,
1209					       &(bool){ true }, false));
1210}
1211
1212void
1213isc_task_pause(isc_task_t *task) {
1214	REQUIRE(VALID_TASK(task));
1215
1216	LOCK(&task->lock);
1217	task->pause_cnt++;
1218	if (task->pause_cnt > 1) {
1219		/*
1220		 * Someone already paused this task, just increase
1221		 * the number of pausing clients.
1222		 */
1223		UNLOCK(&task->lock);
1224		return;
1225	}
1226
1227	INSIST(task->state == task_state_idle ||
1228	       task->state == task_state_ready ||
1229	       task->state == task_state_running);
1230	if (task->state == task_state_running) {
1231		task->state = task_state_pausing;
1232	} else {
1233		task->state = task_state_paused;
1234	}
1235	UNLOCK(&task->lock);
1236}
1237
1238void
1239isc_task_unpause(isc_task_t *task) {
1240	bool was_idle = false;
1241
1242	REQUIRE(VALID_TASK(task));
1243
1244	LOCK(&task->lock);
1245	task->pause_cnt--;
1246	INSIST(task->pause_cnt >= 0);
1247	if (task->pause_cnt > 0) {
1248		UNLOCK(&task->lock);
1249		return;
1250	}
1251
1252	INSIST(task->state == task_state_paused ||
1253	       task->state == task_state_pausing);
1254	/* If the task was pausing we can't reschedule it */
1255	if (task->state == task_state_pausing) {
1256		task->state = task_state_running;
1257	} else {
1258		task->state = task_state_idle;
1259	}
1260	if (task->state == task_state_idle && !EMPTY(task->events)) {
1261		task->state = task_state_ready;
1262		was_idle = true;
1263	}
1264	UNLOCK(&task->lock);
1265
1266	if (was_idle) {
1267		task_ready(task);
1268	}
1269}
1270
1271void
1272isc_taskmgr_setmode(isc_taskmgr_t *manager, isc_taskmgrmode_t mode) {
1273	atomic_store(&manager->mode, mode);
1274}
1275
1276isc_taskmgrmode_t
1277isc_taskmgr_mode(isc_taskmgr_t *manager) {
1278	return (atomic_load(&manager->mode));
1279}
1280
1281void
1282isc_task_setprivilege(isc_task_t *task, bool priv) {
1283	REQUIRE(VALID_TASK(task));
1284
1285	atomic_store_release(&task->privileged, priv);
1286}
1287
1288bool
1289isc_task_getprivilege(isc_task_t *task) {
1290	REQUIRE(VALID_TASK(task));
1291
1292	return (TASK_PRIVILEGED(task));
1293}
1294
1295bool
1296isc_task_privileged(isc_task_t *task) {
1297	REQUIRE(VALID_TASK(task));
1298
1299	return (isc_taskmgr_mode(task->manager) && TASK_PRIVILEGED(task));
1300}
1301
1302bool
1303isc_task_exiting(isc_task_t *task) {
1304	REQUIRE(VALID_TASK(task));
1305
1306	return (TASK_SHUTTINGDOWN(task));
1307}
1308
1309#ifdef HAVE_LIBXML2
1310#define TRY0(a)                     \
1311	do {                        \
1312		xmlrc = (a);        \
1313		if (xmlrc < 0)      \
1314			goto error; \
1315	} while (0)
1316int
1317isc_taskmgr_renderxml(isc_taskmgr_t *mgr, void *writer0) {
1318	isc_task_t *task = NULL;
1319	int xmlrc;
1320	xmlTextWriterPtr writer = (xmlTextWriterPtr)writer0;
1321
1322	LOCK(&mgr->lock);
1323
1324	/*
1325	 * Write out the thread-model, and some details about each depending
1326	 * on which type is enabled.
1327	 */
1328	TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "thread-model"));
1329	TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "type"));
1330	TRY0(xmlTextWriterWriteString(writer, ISC_XMLCHAR "threaded"));
1331	TRY0(xmlTextWriterEndElement(writer)); /* type */
1332
1333	TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "default-quantum"));
1334	TRY0(xmlTextWriterWriteFormatString(writer, "%d",
1335					    mgr->default_quantum));
1336	TRY0(xmlTextWriterEndElement(writer)); /* default-quantum */
1337
1338	TRY0(xmlTextWriterEndElement(writer)); /* thread-model */
1339
1340	TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks"));
1341	task = ISC_LIST_HEAD(mgr->tasks);
1342	while (task != NULL) {
1343		LOCK(&task->lock);
1344		TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "task"));
1345
1346		if (task->name[0] != 0) {
1347			TRY0(xmlTextWriterStartElement(writer,
1348						       ISC_XMLCHAR "name"));
1349			TRY0(xmlTextWriterWriteFormatString(writer, "%s",
1350							    task->name));
1351			TRY0(xmlTextWriterEndElement(writer)); /* name */
1352		}
1353
1354		TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "reference"
1355								   "s"));
1356		TRY0(xmlTextWriterWriteFormatString(
1357			writer, "%" PRIuFAST32,
1358			isc_refcount_current(&task->references)));
1359		TRY0(xmlTextWriterEndElement(writer)); /* references */
1360
1361		TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "id"));
1362		TRY0(xmlTextWriterWriteFormatString(writer, "%p", task));
1363		TRY0(xmlTextWriterEndElement(writer)); /* id */
1364
1365		TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "state"));
1366		TRY0(xmlTextWriterWriteFormatString(writer, "%s",
1367						    statenames[task->state]));
1368		TRY0(xmlTextWriterEndElement(writer)); /* state */
1369
1370		TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "quantum"));
1371		TRY0(xmlTextWriterWriteFormatString(writer, "%d",
1372						    task->quantum));
1373		TRY0(xmlTextWriterEndElement(writer)); /* quantum */
1374
1375		TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "events"));
1376		TRY0(xmlTextWriterWriteFormatString(writer, "%d",
1377						    task->nevents));
1378		TRY0(xmlTextWriterEndElement(writer)); /* events */
1379
1380		TRY0(xmlTextWriterEndElement(writer));
1381
1382		UNLOCK(&task->lock);
1383		task = ISC_LIST_NEXT(task, link);
1384	}
1385	TRY0(xmlTextWriterEndElement(writer)); /* tasks */
1386
1387error:
1388	if (task != NULL) {
1389		UNLOCK(&task->lock);
1390	}
1391	UNLOCK(&mgr->lock);
1392
1393	return (xmlrc);
1394}
1395#endif /* HAVE_LIBXML2 */
1396
1397#ifdef HAVE_JSON_C
1398#define CHECKMEM(m)                              \
1399	do {                                     \
1400		if (m == NULL) {                 \
1401			result = ISC_R_NOMEMORY; \
1402			goto error;              \
1403		}                                \
1404	} while (0)
1405
1406isc_result_t
1407isc_taskmgr_renderjson(isc_taskmgr_t *mgr, void *tasks0) {
1408	isc_result_t result = ISC_R_SUCCESS;
1409	isc_task_t *task = NULL;
1410	json_object *obj = NULL, *array = NULL, *taskobj = NULL;
1411	json_object *tasks = (json_object *)tasks0;
1412
1413	LOCK(&mgr->lock);
1414
1415	/*
1416	 * Write out the thread-model, and some details about each depending
1417	 * on which type is enabled.
1418	 */
1419	obj = json_object_new_string("threaded");
1420	CHECKMEM(obj);
1421	json_object_object_add(tasks, "thread-model", obj);
1422
1423	obj = json_object_new_int(mgr->default_quantum);
1424	CHECKMEM(obj);
1425	json_object_object_add(tasks, "default-quantum", obj);
1426
1427	array = json_object_new_array();
1428	CHECKMEM(array);
1429
1430	for (task = ISC_LIST_HEAD(mgr->tasks); task != NULL;
1431	     task = ISC_LIST_NEXT(task, link))
1432	{
1433		char buf[255];
1434
1435		LOCK(&task->lock);
1436
1437		taskobj = json_object_new_object();
1438		CHECKMEM(taskobj);
1439		json_object_array_add(array, taskobj);
1440
1441		snprintf(buf, sizeof(buf), "%p", task);
1442		obj = json_object_new_string(buf);
1443		CHECKMEM(obj);
1444		json_object_object_add(taskobj, "id", obj);
1445
1446		if (task->name[0] != 0) {
1447			obj = json_object_new_string(task->name);
1448			CHECKMEM(obj);
1449			json_object_object_add(taskobj, "name", obj);
1450		}
1451
1452		obj = json_object_new_int(
1453			isc_refcount_current(&task->references));
1454		CHECKMEM(obj);
1455		json_object_object_add(taskobj, "references", obj);
1456
1457		obj = json_object_new_string(statenames[task->state]);
1458		CHECKMEM(obj);
1459		json_object_object_add(taskobj, "state", obj);
1460
1461		obj = json_object_new_int(task->quantum);
1462		CHECKMEM(obj);
1463		json_object_object_add(taskobj, "quantum", obj);
1464
1465		obj = json_object_new_int(task->nevents);
1466		CHECKMEM(obj);
1467		json_object_object_add(taskobj, "events", obj);
1468
1469		UNLOCK(&task->lock);
1470	}
1471
1472	json_object_object_add(tasks, "tasks", array);
1473	array = NULL;
1474	result = ISC_R_SUCCESS;
1475
1476error:
1477	if (array != NULL) {
1478		json_object_put(array);
1479	}
1480
1481	if (task != NULL) {
1482		UNLOCK(&task->lock);
1483	}
1484	UNLOCK(&mgr->lock);
1485
1486	return (result);
1487}
1488#endif /* ifdef HAVE_JSON_C */
1489