1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5
6#include "funnel-workqueue.h"
7
8#include <linux/atomic.h>
9#include <linux/cache.h>
10#include <linux/completion.h>
11#include <linux/err.h>
12#include <linux/kthread.h>
13#include <linux/percpu.h>
14
15#include "funnel-queue.h"
16#include "logger.h"
17#include "memory-alloc.h"
18#include "numeric.h"
19#include "permassert.h"
20#include "string-utils.h"
21
22#include "completion.h"
23#include "status-codes.h"
24
25static DEFINE_PER_CPU(unsigned int, service_queue_rotor);
26
27/**
28 * DOC: Work queue definition.
29 *
30 * There are two types of work queues: simple, with one worker thread, and round-robin, which uses
31 * a group of the former to do the work, and assigns work to them in round-robin fashion (roughly).
32 * Externally, both are represented via the same common sub-structure, though there's actually not
33 * a great deal of overlap between the two types internally.
34 */
35struct vdo_work_queue {
36	/* Name of just the work queue (e.g., "cpuQ12") */
37	char *name;
38	bool round_robin_mode;
39	struct vdo_thread *owner;
40	/* Life cycle functions, etc */
41	const struct vdo_work_queue_type *type;
42};
43
44struct simple_work_queue {
45	struct vdo_work_queue common;
46	struct funnel_queue *priority_lists[VDO_WORK_Q_MAX_PRIORITY + 1];
47	void *private;
48
49	/*
50	 * The fields above are unchanged after setup but often read, and are good candidates for
51	 * caching -- and if the max priority is 2, just fit in one x86-64 cache line if aligned.
52	 * The fields below are often modified as we sleep and wake, so we want a separate cache
53	 * line for performance.
54	 */
55
56	/* Any (0 or 1) worker threads waiting for new work to do */
57	wait_queue_head_t waiting_worker_threads ____cacheline_aligned;
58	/* Hack to reduce wakeup calls if the worker thread is running */
59	atomic_t idle;
60
61	/* These are infrequently used so in terms of performance we don't care where they land. */
62	struct task_struct *thread;
63	/* Notify creator once worker has initialized */
64	struct completion *started;
65};
66
67struct round_robin_work_queue {
68	struct vdo_work_queue common;
69	struct simple_work_queue **service_queues;
70	unsigned int num_service_queues;
71};
72
73static inline struct simple_work_queue *as_simple_work_queue(struct vdo_work_queue *queue)
74{
75	return ((queue == NULL) ?
76		NULL : container_of(queue, struct simple_work_queue, common));
77}
78
79static inline struct round_robin_work_queue *as_round_robin_work_queue(struct vdo_work_queue *queue)
80{
81	return ((queue == NULL) ?
82		 NULL :
83		 container_of(queue, struct round_robin_work_queue, common));
84}
85
86/* Processing normal completions. */
87
88/*
89 * Dequeue and return the next waiting completion, if any.
90 *
91 * We scan the funnel queues from highest priority to lowest, once; there is therefore a race
92 * condition where a high-priority completion can be enqueued followed by a lower-priority one, and
93 * we'll grab the latter (but we'll catch the high-priority item on the next call). If strict
94 * enforcement of priorities becomes necessary, this function will need fixing.
95 */
96static struct vdo_completion *poll_for_completion(struct simple_work_queue *queue)
97{
98	int i;
99
100	for (i = queue->common.type->max_priority; i >= 0; i--) {
101		struct funnel_queue_entry *link = vdo_funnel_queue_poll(queue->priority_lists[i]);
102
103		if (link != NULL)
104			return container_of(link, struct vdo_completion, work_queue_entry_link);
105	}
106
107	return NULL;
108}
109
110static void enqueue_work_queue_completion(struct simple_work_queue *queue,
111					  struct vdo_completion *completion)
112{
113	VDO_ASSERT_LOG_ONLY(completion->my_queue == NULL,
114			    "completion %px (fn %px) to enqueue (%px) is not already queued (%px)",
115			    completion, completion->callback, queue, completion->my_queue);
116	if (completion->priority == VDO_WORK_Q_DEFAULT_PRIORITY)
117		completion->priority = queue->common.type->default_priority;
118
119	if (VDO_ASSERT(completion->priority <= queue->common.type->max_priority,
120		       "priority is in range for queue") != VDO_SUCCESS)
121		completion->priority = 0;
122
123	completion->my_queue = &queue->common;
124
125	/* Funnel queue handles the synchronization for the put. */
126	vdo_funnel_queue_put(queue->priority_lists[completion->priority],
127			     &completion->work_queue_entry_link);
128
129	/*
130	 * Due to how funnel queue synchronization is handled (just atomic operations), the
131	 * simplest safe implementation here would be to wake-up any waiting threads after
132	 * enqueueing each item. Even if the funnel queue is not empty at the time of adding an
133	 * item to the queue, the consumer thread may not see this since it is not guaranteed to
134	 * have the same view of the queue as a producer thread.
135	 *
136	 * However, the above is wasteful so instead we attempt to minimize the number of thread
137	 * wakeups. Using an idle flag, and careful ordering using memory barriers, we should be
138	 * able to determine when the worker thread might be asleep or going to sleep. We use
139	 * cmpxchg to try to take ownership (vs other producer threads) of the responsibility for
140	 * waking the worker thread, so multiple wakeups aren't tried at once.
141	 *
142	 * This was tuned for some x86 boxes that were handy; it's untested whether doing the read
143	 * first is any better or worse for other platforms, even other x86 configurations.
144	 */
145	smp_mb();
146	if ((atomic_read(&queue->idle) != 1) || (atomic_cmpxchg(&queue->idle, 1, 0) != 1))
147		return;
148
149	/* There's a maximum of one thread in this list. */
150	wake_up(&queue->waiting_worker_threads);
151}
152
153static void run_start_hook(struct simple_work_queue *queue)
154{
155	if (queue->common.type->start != NULL)
156		queue->common.type->start(queue->private);
157}
158
159static void run_finish_hook(struct simple_work_queue *queue)
160{
161	if (queue->common.type->finish != NULL)
162		queue->common.type->finish(queue->private);
163}
164
165/*
166 * Wait for the next completion to process, or until kthread_should_stop indicates that it's time
167 * for us to shut down.
168 *
169 * If kthread_should_stop says it's time to stop but we have pending completions return a
170 * completion.
171 *
172 * Also update statistics relating to scheduler interactions.
173 */
174static struct vdo_completion *wait_for_next_completion(struct simple_work_queue *queue)
175{
176	struct vdo_completion *completion;
177	DEFINE_WAIT(wait);
178
179	while (true) {
180		prepare_to_wait(&queue->waiting_worker_threads, &wait,
181				TASK_INTERRUPTIBLE);
182		/*
183		 * Don't set the idle flag until a wakeup will not be lost.
184		 *
185		 * Force synchronization between setting the idle flag and checking the funnel
186		 * queue; the producer side will do them in the reverse order. (There's still a
187		 * race condition we've chosen to allow, because we've got a timeout below that
188		 * unwedges us if we hit it, but this may narrow the window a little.)
189		 */
190		atomic_set(&queue->idle, 1);
191		smp_mb(); /* store-load barrier between "idle" and funnel queue */
192
193		completion = poll_for_completion(queue);
194		if (completion != NULL)
195			break;
196
197		/*
198		 * We need to check for thread-stop after setting TASK_INTERRUPTIBLE state up
199		 * above. Otherwise, schedule() will put the thread to sleep and might miss a
200		 * wakeup from kthread_stop() call in vdo_finish_work_queue().
201		 */
202		if (kthread_should_stop())
203			break;
204
205		schedule();
206
207		/*
208		 * Most of the time when we wake, it should be because there's work to do. If it
209		 * was a spurious wakeup, continue looping.
210		 */
211		completion = poll_for_completion(queue);
212		if (completion != NULL)
213			break;
214	}
215
216	finish_wait(&queue->waiting_worker_threads, &wait);
217	atomic_set(&queue->idle, 0);
218
219	return completion;
220}
221
222static void process_completion(struct simple_work_queue *queue,
223			       struct vdo_completion *completion)
224{
225	if (VDO_ASSERT(completion->my_queue == &queue->common,
226		       "completion %px from queue %px marked as being in this queue (%px)",
227		       completion, queue, completion->my_queue) == VDO_SUCCESS)
228		completion->my_queue = NULL;
229
230	vdo_run_completion(completion);
231}
232
233static void service_work_queue(struct simple_work_queue *queue)
234{
235	run_start_hook(queue);
236
237	while (true) {
238		struct vdo_completion *completion = poll_for_completion(queue);
239
240		if (completion == NULL)
241			completion = wait_for_next_completion(queue);
242
243		if (completion == NULL) {
244			/* No completions but kthread_should_stop() was triggered. */
245			break;
246		}
247
248		process_completion(queue, completion);
249
250		/*
251		 * Be friendly to a CPU that has other work to do, if the kernel has told us to.
252		 * This speeds up some performance tests; that "other work" might include other VDO
253		 * threads.
254		 */
255		if (need_resched())
256			cond_resched();
257	}
258
259	run_finish_hook(queue);
260}
261
262static int work_queue_runner(void *ptr)
263{
264	struct simple_work_queue *queue = ptr;
265
266	complete(queue->started);
267	service_work_queue(queue);
268	return 0;
269}
270
271/* Creation & teardown */
272
273static void free_simple_work_queue(struct simple_work_queue *queue)
274{
275	unsigned int i;
276
277	for (i = 0; i <= VDO_WORK_Q_MAX_PRIORITY; i++)
278		vdo_free_funnel_queue(queue->priority_lists[i]);
279	vdo_free(queue->common.name);
280	vdo_free(queue);
281}
282
283static void free_round_robin_work_queue(struct round_robin_work_queue *queue)
284{
285	struct simple_work_queue **queue_table = queue->service_queues;
286	unsigned int count = queue->num_service_queues;
287	unsigned int i;
288
289	queue->service_queues = NULL;
290
291	for (i = 0; i < count; i++)
292		free_simple_work_queue(queue_table[i]);
293	vdo_free(queue_table);
294	vdo_free(queue->common.name);
295	vdo_free(queue);
296}
297
298void vdo_free_work_queue(struct vdo_work_queue *queue)
299{
300	if (queue == NULL)
301		return;
302
303	vdo_finish_work_queue(queue);
304
305	if (queue->round_robin_mode)
306		free_round_robin_work_queue(as_round_robin_work_queue(queue));
307	else
308		free_simple_work_queue(as_simple_work_queue(queue));
309}
310
311static int make_simple_work_queue(const char *thread_name_prefix, const char *name,
312				  struct vdo_thread *owner, void *private,
313				  const struct vdo_work_queue_type *type,
314				  struct simple_work_queue **queue_ptr)
315{
316	DECLARE_COMPLETION_ONSTACK(started);
317	struct simple_work_queue *queue;
318	int i;
319	struct task_struct *thread = NULL;
320	int result;
321
322	VDO_ASSERT_LOG_ONLY((type->max_priority <= VDO_WORK_Q_MAX_PRIORITY),
323			    "queue priority count %u within limit %u", type->max_priority,
324			    VDO_WORK_Q_MAX_PRIORITY);
325
326	result = vdo_allocate(1, struct simple_work_queue, "simple work queue", &queue);
327	if (result != VDO_SUCCESS)
328		return result;
329
330	queue->private = private;
331	queue->started = &started;
332	queue->common.type = type;
333	queue->common.owner = owner;
334	init_waitqueue_head(&queue->waiting_worker_threads);
335
336	result = vdo_duplicate_string(name, "queue name", &queue->common.name);
337	if (result != VDO_SUCCESS) {
338		vdo_free(queue);
339		return -ENOMEM;
340	}
341
342	for (i = 0; i <= type->max_priority; i++) {
343		result = vdo_make_funnel_queue(&queue->priority_lists[i]);
344		if (result != VDO_SUCCESS) {
345			free_simple_work_queue(queue);
346			return result;
347		}
348	}
349
350	thread = kthread_run(work_queue_runner, queue, "%s:%s", thread_name_prefix,
351			     queue->common.name);
352	if (IS_ERR(thread)) {
353		free_simple_work_queue(queue);
354		return (int) PTR_ERR(thread);
355	}
356
357	queue->thread = thread;
358
359	/*
360	 * If we don't wait to ensure the thread is running VDO code, a quick kthread_stop (due to
361	 * errors elsewhere) could cause it to never get as far as running VDO, skipping the
362	 * cleanup code.
363	 *
364	 * Eventually we should just make that path safe too, and then we won't need this
365	 * synchronization.
366	 */
367	wait_for_completion(&started);
368
369	*queue_ptr = queue;
370	return VDO_SUCCESS;
371}
372
373/**
374 * vdo_make_work_queue() - Create a work queue; if multiple threads are requested, completions will
375 *                         be distributed to them in round-robin fashion.
376 *
377 * Each queue is associated with a struct vdo_thread which has a single vdo thread id. Regardless
378 * of the actual number of queues and threads allocated here, code outside of the queue
379 * implementation will treat this as a single zone.
380 */
381int vdo_make_work_queue(const char *thread_name_prefix, const char *name,
382			struct vdo_thread *owner, const struct vdo_work_queue_type *type,
383			unsigned int thread_count, void *thread_privates[],
384			struct vdo_work_queue **queue_ptr)
385{
386	struct round_robin_work_queue *queue;
387	int result;
388	char thread_name[TASK_COMM_LEN];
389	unsigned int i;
390
391	if (thread_count == 1) {
392		struct simple_work_queue *simple_queue;
393		void *context = ((thread_privates != NULL) ? thread_privates[0] : NULL);
394
395		result = make_simple_work_queue(thread_name_prefix, name, owner, context,
396						type, &simple_queue);
397		if (result == VDO_SUCCESS)
398			*queue_ptr = &simple_queue->common;
399		return result;
400	}
401
402	result = vdo_allocate(1, struct round_robin_work_queue, "round-robin work queue",
403			      &queue);
404	if (result != VDO_SUCCESS)
405		return result;
406
407	result = vdo_allocate(thread_count, struct simple_work_queue *,
408			      "subordinate work queues", &queue->service_queues);
409	if (result != VDO_SUCCESS) {
410		vdo_free(queue);
411		return result;
412	}
413
414	queue->num_service_queues = thread_count;
415	queue->common.round_robin_mode = true;
416	queue->common.owner = owner;
417
418	result = vdo_duplicate_string(name, "queue name", &queue->common.name);
419	if (result != VDO_SUCCESS) {
420		vdo_free(queue->service_queues);
421		vdo_free(queue);
422		return -ENOMEM;
423	}
424
425	*queue_ptr = &queue->common;
426
427	for (i = 0; i < thread_count; i++) {
428		void *context = ((thread_privates != NULL) ? thread_privates[i] : NULL);
429
430		snprintf(thread_name, sizeof(thread_name), "%s%u", name, i);
431		result = make_simple_work_queue(thread_name_prefix, thread_name, owner,
432						context, type, &queue->service_queues[i]);
433		if (result != VDO_SUCCESS) {
434			queue->num_service_queues = i;
435			/* Destroy previously created subordinates. */
436			vdo_free_work_queue(vdo_forget(*queue_ptr));
437			return result;
438		}
439	}
440
441	return VDO_SUCCESS;
442}
443
444static void finish_simple_work_queue(struct simple_work_queue *queue)
445{
446	if (queue->thread == NULL)
447		return;
448
449	/* Tells the worker thread to shut down and waits for it to exit. */
450	kthread_stop(queue->thread);
451	queue->thread = NULL;
452}
453
454static void finish_round_robin_work_queue(struct round_robin_work_queue *queue)
455{
456	struct simple_work_queue **queue_table = queue->service_queues;
457	unsigned int count = queue->num_service_queues;
458	unsigned int i;
459
460	for (i = 0; i < count; i++)
461		finish_simple_work_queue(queue_table[i]);
462}
463
464/* No enqueueing of completions should be done once this function is called. */
465void vdo_finish_work_queue(struct vdo_work_queue *queue)
466{
467	if (queue == NULL)
468		return;
469
470	if (queue->round_robin_mode)
471		finish_round_robin_work_queue(as_round_robin_work_queue(queue));
472	else
473		finish_simple_work_queue(as_simple_work_queue(queue));
474}
475
476/* Debugging dumps */
477
478static void dump_simple_work_queue(struct simple_work_queue *queue)
479{
480	const char *thread_status = "no threads";
481	char task_state_report = '-';
482
483	if (queue->thread != NULL) {
484		task_state_report = task_state_to_char(queue->thread);
485		thread_status = atomic_read(&queue->idle) ? "idle" : "running";
486	}
487
488	vdo_log_info("workQ %px (%s) %s (%c)", &queue->common, queue->common.name,
489		     thread_status, task_state_report);
490
491	/* ->waiting_worker_threads wait queue status? anyone waiting? */
492}
493
494/*
495 * Write to the buffer some info about the completion, for logging. Since the common use case is
496 * dumping info about a lot of completions to syslog all at once, the format favors brevity over
497 * readability.
498 */
499void vdo_dump_work_queue(struct vdo_work_queue *queue)
500{
501	if (queue->round_robin_mode) {
502		struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
503		unsigned int i;
504
505		for (i = 0; i < round_robin->num_service_queues; i++)
506			dump_simple_work_queue(round_robin->service_queues[i]);
507	} else {
508		dump_simple_work_queue(as_simple_work_queue(queue));
509	}
510}
511
512static void get_function_name(void *pointer, char *buffer, size_t buffer_length)
513{
514	if (pointer == NULL) {
515		/*
516		 * Format "%ps" logs a null pointer as "(null)" with a bunch of leading spaces. We
517		 * sometimes use this when logging lots of data; don't be so verbose.
518		 */
519		strscpy(buffer, "-", buffer_length);
520	} else {
521		/*
522		 * Use a pragma to defeat gcc's format checking, which doesn't understand that
523		 * "%ps" actually does support a precision spec in Linux kernel code.
524		 */
525		char *space;
526
527#pragma GCC diagnostic push
528#pragma GCC diagnostic ignored "-Wformat"
529		snprintf(buffer, buffer_length, "%.*ps", buffer_length - 1, pointer);
530#pragma GCC diagnostic pop
531
532		space = strchr(buffer, ' ');
533		if (space != NULL)
534			*space = '\0';
535	}
536}
537
538void vdo_dump_completion_to_buffer(struct vdo_completion *completion, char *buffer,
539				   size_t length)
540{
541	size_t current_length =
542		scnprintf(buffer, length, "%.*s/", TASK_COMM_LEN,
543			  (completion->my_queue == NULL ? "-" : completion->my_queue->name));
544
545	if (current_length < length - 1) {
546		get_function_name((void *) completion->callback, buffer + current_length,
547				  length - current_length);
548	}
549}
550
551/* Completion submission */
552/*
553 * If the completion has a timeout that has already passed, the timeout handler function may be
554 * invoked by this function.
555 */
556void vdo_enqueue_work_queue(struct vdo_work_queue *queue,
557			    struct vdo_completion *completion)
558{
559	/*
560	 * Convert the provided generic vdo_work_queue to the simple_work_queue to actually queue
561	 * on.
562	 */
563	struct simple_work_queue *simple_queue = NULL;
564
565	if (!queue->round_robin_mode) {
566		simple_queue = as_simple_work_queue(queue);
567	} else {
568		struct round_robin_work_queue *round_robin = as_round_robin_work_queue(queue);
569
570		/*
571		 * It shouldn't be a big deal if the same rotor gets used for multiple work queues.
572		 * Any patterns that might develop are likely to be disrupted by random ordering of
573		 * multiple completions and migration between cores, unless the load is so light as
574		 * to be regular in ordering of tasks and the threads are confined to individual
575		 * cores; with a load that light we won't care.
576		 */
577		unsigned int rotor = this_cpu_inc_return(service_queue_rotor);
578		unsigned int index = rotor % round_robin->num_service_queues;
579
580		simple_queue = round_robin->service_queues[index];
581	}
582
583	enqueue_work_queue_completion(simple_queue, completion);
584}
585
586/* Misc */
587
588/*
589 * Return the work queue pointer recorded at initialization time in the work-queue stack handle
590 * initialized on the stack of the current thread, if any.
591 */
592static struct simple_work_queue *get_current_thread_work_queue(void)
593{
594	/*
595	 * In interrupt context, if a vdo thread is what got interrupted, the calls below will find
596	 * the queue for the thread which was interrupted. However, the interrupted thread may have
597	 * been processing a completion, in which case starting to process another would violate
598	 * our concurrency assumptions.
599	 */
600	if (in_interrupt())
601		return NULL;
602
603	if (kthread_func(current) != work_queue_runner)
604		/* Not a VDO work queue thread. */
605		return NULL;
606
607	return kthread_data(current);
608}
609
610struct vdo_work_queue *vdo_get_current_work_queue(void)
611{
612	struct simple_work_queue *queue = get_current_thread_work_queue();
613
614	return (queue == NULL) ? NULL : &queue->common;
615}
616
617struct vdo_thread *vdo_get_work_queue_owner(struct vdo_work_queue *queue)
618{
619	return queue->owner;
620}
621
622/**
623 * vdo_get_work_queue_private_data() - Returns the private data for the current thread's work
624 *                                     queue, or NULL if none or if the current thread is not a
625 *                                     work queue thread.
626 */
627void *vdo_get_work_queue_private_data(void)
628{
629	struct simple_work_queue *queue = get_current_thread_work_queue();
630
631	return (queue != NULL) ? queue->private : NULL;
632}
633
634bool vdo_work_queue_type_is(struct vdo_work_queue *queue,
635			    const struct vdo_work_queue_type *type)
636{
637	return (queue->type == type);
638}
639