1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5
6#include "funnel-requestqueue.h"
7
8#include <linux/atomic.h>
9#include <linux/compiler.h>
10#include <linux/wait.h>
11
12#include "funnel-queue.h"
13#include "logger.h"
14#include "memory-alloc.h"
15#include "thread-utils.h"
16
17/*
18 * This queue will attempt to handle requests in reasonably sized batches instead of reacting
19 * immediately to each new request. The wait time between batches is dynamically adjusted up or
20 * down to try to balance responsiveness against wasted thread run time.
21 *
22 * If the wait time becomes long enough, the queue will become dormant and must be explicitly
23 * awoken when a new request is enqueued. The enqueue operation updates "newest" in the funnel
24 * queue via xchg (which is a memory barrier), and later checks "dormant" to decide whether to do a
25 * wakeup of the worker thread.
26 *
27 * When deciding to go to sleep, the worker thread sets "dormant" and then examines "newest" to
28 * decide if the funnel queue is idle. In dormant mode, the last examination of "newest" before
29 * going to sleep is done inside the wait_event_interruptible() macro, after a point where one or
30 * more memory barriers have been issued. (Preparing to sleep uses spin locks.) Even if the funnel
31 * queue's "next" field update isn't visible yet to make the entry accessible, its existence will
32 * kick the worker thread out of dormant mode and back into timer-based mode.
33 *
34 * Unbatched requests are used to communicate between different zone threads and will also cause
35 * the queue to awaken immediately.
36 */
37
38enum {
39	NANOSECOND = 1,
40	MICROSECOND = 1000 * NANOSECOND,
41	MILLISECOND = 1000 * MICROSECOND,
42	DEFAULT_WAIT_TIME = 20 * MICROSECOND,
43	MINIMUM_WAIT_TIME = DEFAULT_WAIT_TIME / 2,
44	MAXIMUM_WAIT_TIME = MILLISECOND,
45	MINIMUM_BATCH = 32,
46	MAXIMUM_BATCH = 64,
47};
48
49struct uds_request_queue {
50	/* Wait queue for synchronizing producers and consumer */
51	struct wait_queue_head wait_head;
52	/* Function to process a request */
53	uds_request_queue_processor_fn processor;
54	/* Queue of new incoming requests */
55	struct funnel_queue *main_queue;
56	/* Queue of old requests to retry */
57	struct funnel_queue *retry_queue;
58	/* The thread id of the worker thread */
59	struct thread *thread;
60	/* True if the worker was started */
61	bool started;
62	/* When true, requests can be enqueued */
63	bool running;
64	/* A flag set when the worker is waiting without a timeout */
65	atomic_t dormant;
66};
67
68static inline struct uds_request *poll_queues(struct uds_request_queue *queue)
69{
70	struct funnel_queue_entry *entry;
71
72	entry = vdo_funnel_queue_poll(queue->retry_queue);
73	if (entry != NULL)
74		return container_of(entry, struct uds_request, queue_link);
75
76	entry = vdo_funnel_queue_poll(queue->main_queue);
77	if (entry != NULL)
78		return container_of(entry, struct uds_request, queue_link);
79
80	return NULL;
81}
82
83static inline bool are_queues_idle(struct uds_request_queue *queue)
84{
85	return vdo_is_funnel_queue_idle(queue->retry_queue) &&
86	       vdo_is_funnel_queue_idle(queue->main_queue);
87}
88
89/*
90 * Determine if there is a next request to process, and return it if there is. Also return flags
91 * indicating whether the worker thread can sleep (for the use of wait_event() macros) and whether
92 * the thread did sleep before returning a new request.
93 */
94static inline bool dequeue_request(struct uds_request_queue *queue,
95				   struct uds_request **request_ptr, bool *waited_ptr)
96{
97	struct uds_request *request = poll_queues(queue);
98
99	if (request != NULL) {
100		*request_ptr = request;
101		return true;
102	}
103
104	if (!READ_ONCE(queue->running)) {
105		/* Wake the worker thread so it can exit. */
106		*request_ptr = NULL;
107		return true;
108	}
109
110	*request_ptr = NULL;
111	*waited_ptr = true;
112	return false;
113}
114
115static void wait_for_request(struct uds_request_queue *queue, bool dormant,
116			     unsigned long timeout, struct uds_request **request,
117			     bool *waited)
118{
119	if (dormant) {
120		wait_event_interruptible(queue->wait_head,
121					 (dequeue_request(queue, request, waited) ||
122					  !are_queues_idle(queue)));
123		return;
124	}
125
126	wait_event_interruptible_hrtimeout(queue->wait_head,
127					   dequeue_request(queue, request, waited),
128					   ns_to_ktime(timeout));
129}
130
131static void request_queue_worker(void *arg)
132{
133	struct uds_request_queue *queue = arg;
134	struct uds_request *request = NULL;
135	unsigned long time_batch = DEFAULT_WAIT_TIME;
136	bool dormant = atomic_read(&queue->dormant);
137	bool waited = false;
138	long current_batch = 0;
139
140	for (;;) {
141		wait_for_request(queue, dormant, time_batch, &request, &waited);
142		if (likely(request != NULL)) {
143			current_batch++;
144			queue->processor(request);
145		} else if (!READ_ONCE(queue->running)) {
146			break;
147		}
148
149		if (dormant) {
150			/*
151			 * The queue has been roused from dormancy. Clear the flag so enqueuers can
152			 * stop broadcasting. No fence is needed for this transition.
153			 */
154			atomic_set(&queue->dormant, false);
155			dormant = false;
156			time_batch = DEFAULT_WAIT_TIME;
157		} else if (waited) {
158			/*
159			 * We waited for this request to show up. Adjust the wait time to smooth
160			 * out the batch size.
161			 */
162			if (current_batch < MINIMUM_BATCH) {
163				/*
164				 * If the last batch of requests was too small, increase the wait
165				 * time.
166				 */
167				time_batch += time_batch / 4;
168				if (time_batch >= MAXIMUM_WAIT_TIME) {
169					atomic_set(&queue->dormant, true);
170					dormant = true;
171				}
172			} else if (current_batch > MAXIMUM_BATCH) {
173				/*
174				 * If the last batch of requests was too large, decrease the wait
175				 * time.
176				 */
177				time_batch -= time_batch / 4;
178				if (time_batch < MINIMUM_WAIT_TIME)
179					time_batch = MINIMUM_WAIT_TIME;
180			}
181			current_batch = 0;
182		}
183	}
184
185	/*
186	 * Ensure that we process any remaining requests that were enqueued before trying to shut
187	 * down. The corresponding write barrier is in uds_request_queue_finish().
188	 */
189	smp_rmb();
190	while ((request = poll_queues(queue)) != NULL)
191		queue->processor(request);
192}
193
194int uds_make_request_queue(const char *queue_name,
195			   uds_request_queue_processor_fn processor,
196			   struct uds_request_queue **queue_ptr)
197{
198	int result;
199	struct uds_request_queue *queue;
200
201	result = vdo_allocate(1, struct uds_request_queue, __func__, &queue);
202	if (result != VDO_SUCCESS)
203		return result;
204
205	queue->processor = processor;
206	queue->running = true;
207	atomic_set(&queue->dormant, false);
208	init_waitqueue_head(&queue->wait_head);
209
210	result = vdo_make_funnel_queue(&queue->main_queue);
211	if (result != VDO_SUCCESS) {
212		uds_request_queue_finish(queue);
213		return result;
214	}
215
216	result = vdo_make_funnel_queue(&queue->retry_queue);
217	if (result != VDO_SUCCESS) {
218		uds_request_queue_finish(queue);
219		return result;
220	}
221
222	result = vdo_create_thread(request_queue_worker, queue, queue_name,
223				   &queue->thread);
224	if (result != VDO_SUCCESS) {
225		uds_request_queue_finish(queue);
226		return result;
227	}
228
229	queue->started = true;
230	*queue_ptr = queue;
231	return UDS_SUCCESS;
232}
233
234static inline void wake_up_worker(struct uds_request_queue *queue)
235{
236	if (wq_has_sleeper(&queue->wait_head))
237		wake_up(&queue->wait_head);
238}
239
240void uds_request_queue_enqueue(struct uds_request_queue *queue,
241			       struct uds_request *request)
242{
243	struct funnel_queue *sub_queue;
244	bool unbatched = request->unbatched;
245
246	sub_queue = request->requeued ? queue->retry_queue : queue->main_queue;
247	vdo_funnel_queue_put(sub_queue, &request->queue_link);
248
249	/*
250	 * We must wake the worker thread when it is dormant. A read fence isn't needed here since
251	 * we know the queue operation acts as one.
252	 */
253	if (atomic_read(&queue->dormant) || unbatched)
254		wake_up_worker(queue);
255}
256
257void uds_request_queue_finish(struct uds_request_queue *queue)
258{
259	if (queue == NULL)
260		return;
261
262	/*
263	 * This memory barrier ensures that any requests we queued will be seen. The point is that
264	 * when dequeue_request() sees the following update to the running flag, it will also be
265	 * able to see any change we made to a next field in the funnel queue entry. The
266	 * corresponding read barrier is in request_queue_worker().
267	 */
268	smp_wmb();
269	WRITE_ONCE(queue->running, false);
270
271	if (queue->started) {
272		wake_up_worker(queue);
273		vdo_join_threads(queue->thread);
274	}
275
276	vdo_free_funnel_queue(queue->main_queue);
277	vdo_free_funnel_queue(queue->retry_queue);
278	vdo_free(queue);
279}
280