1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5
6#include "io-submitter.h"
7
8#include <linux/bio.h>
9#include <linux/kernel.h>
10#include <linux/mutex.h>
11
12#include "memory-alloc.h"
13#include "permassert.h"
14
15#include "data-vio.h"
16#include "logger.h"
17#include "types.h"
18#include "vdo.h"
19#include "vio.h"
20
21/*
22 * Submission of bio operations to the underlying storage device will go through a separate work
23 * queue thread (or more than one) to prevent blocking in other threads if the storage device has a
24 * full queue. The plug structure allows that thread to do better batching of requests to make the
25 * I/O more efficient.
26 *
27 * When multiple worker threads are used, a thread is chosen for a I/O operation submission based
28 * on the PBN, so a given PBN will consistently wind up on the same thread. Flush operations are
29 * assigned round-robin.
30 *
31 * The map (protected by the mutex) collects pending I/O operations so that the worker thread can
32 * reorder them to try to encourage I/O request merging in the request queue underneath.
33 */
34struct bio_queue_data {
35	struct vdo_work_queue *queue;
36	struct blk_plug plug;
37	struct int_map *map;
38	struct mutex lock;
39	unsigned int queue_number;
40};
41
42struct io_submitter {
43	unsigned int num_bio_queues_used;
44	unsigned int bio_queue_rotation_interval;
45	struct bio_queue_data bio_queue_data[];
46};
47
48static void start_bio_queue(void *ptr)
49{
50	struct bio_queue_data *bio_queue_data = ptr;
51
52	blk_start_plug(&bio_queue_data->plug);
53}
54
55static void finish_bio_queue(void *ptr)
56{
57	struct bio_queue_data *bio_queue_data = ptr;
58
59	blk_finish_plug(&bio_queue_data->plug);
60}
61
62static const struct vdo_work_queue_type bio_queue_type = {
63	.start = start_bio_queue,
64	.finish = finish_bio_queue,
65	.max_priority = BIO_Q_MAX_PRIORITY,
66	.default_priority = BIO_Q_DATA_PRIORITY,
67};
68
69/**
70 * count_all_bios() - Determine which bio counter to use.
71 * @vio: The vio associated with the bio.
72 * @bio: The bio to count.
73 */
74static void count_all_bios(struct vio *vio, struct bio *bio)
75{
76	struct atomic_statistics *stats = &vio->completion.vdo->stats;
77
78	if (is_data_vio(vio)) {
79		vdo_count_bios(&stats->bios_out, bio);
80		return;
81	}
82
83	vdo_count_bios(&stats->bios_meta, bio);
84	if (vio->type == VIO_TYPE_RECOVERY_JOURNAL)
85		vdo_count_bios(&stats->bios_journal, bio);
86	else if (vio->type == VIO_TYPE_BLOCK_MAP)
87		vdo_count_bios(&stats->bios_page_cache, bio);
88}
89
90/**
91 * assert_in_bio_zone() - Assert that a vio is in the correct bio zone and not in interrupt
92 *                        context.
93 * @vio: The vio to check.
94 */
95static void assert_in_bio_zone(struct vio *vio)
96{
97	VDO_ASSERT_LOG_ONLY(!in_interrupt(), "not in interrupt context");
98	assert_vio_in_bio_zone(vio);
99}
100
101/**
102 * send_bio_to_device() - Update stats and tracing info, then submit the supplied bio to the OS for
103 *                        processing.
104 * @vio: The vio associated with the bio.
105 * @bio: The bio to submit to the OS.
106 */
107static void send_bio_to_device(struct vio *vio, struct bio *bio)
108{
109	struct vdo *vdo = vio->completion.vdo;
110
111	assert_in_bio_zone(vio);
112	atomic64_inc(&vdo->stats.bios_submitted);
113	count_all_bios(vio, bio);
114	bio_set_dev(bio, vdo_get_backing_device(vdo));
115	submit_bio_noacct(bio);
116}
117
118/**
119 * vdo_submit_vio() - Submits a vio's bio to the underlying block device. May block if the device
120 *		      is busy. This callback should be used by vios which did not attempt to merge.
121 */
122void vdo_submit_vio(struct vdo_completion *completion)
123{
124	struct vio *vio = as_vio(completion);
125
126	send_bio_to_device(vio, vio->bio);
127}
128
129/**
130 * get_bio_list() - Extract the list of bios to submit from a vio.
131 * @vio: The vio submitting I/O.
132 *
133 * The list will always contain at least one entry (the bio for the vio on which it is called), but
134 * other bios may have been merged with it as well.
135 *
136 * Return: bio  The head of the bio list to submit.
137 */
138static struct bio *get_bio_list(struct vio *vio)
139{
140	struct bio *bio;
141	struct io_submitter *submitter = vio->completion.vdo->io_submitter;
142	struct bio_queue_data *bio_queue_data = &(submitter->bio_queue_data[vio->bio_zone]);
143
144	assert_in_bio_zone(vio);
145
146	mutex_lock(&bio_queue_data->lock);
147	vdo_int_map_remove(bio_queue_data->map,
148			   vio->bios_merged.head->bi_iter.bi_sector);
149	vdo_int_map_remove(bio_queue_data->map,
150			   vio->bios_merged.tail->bi_iter.bi_sector);
151	bio = vio->bios_merged.head;
152	bio_list_init(&vio->bios_merged);
153	mutex_unlock(&bio_queue_data->lock);
154
155	return bio;
156}
157
158/**
159 * submit_data_vio() - Submit a data_vio's bio to the storage below along with
160 *		       any bios that have been merged with it.
161 *
162 * Context: This call may block and so should only be called from a bio thread.
163 */
164static void submit_data_vio(struct vdo_completion *completion)
165{
166	struct bio *bio, *next;
167	struct vio *vio = as_vio(completion);
168
169	assert_in_bio_zone(vio);
170	for (bio = get_bio_list(vio); bio != NULL; bio = next) {
171		next = bio->bi_next;
172		bio->bi_next = NULL;
173		send_bio_to_device((struct vio *) bio->bi_private, bio);
174	}
175}
176
177/**
178 * get_mergeable_locked() - Attempt to find an already queued bio that the current bio can be
179 *                          merged with.
180 * @map: The bio map to use for merging.
181 * @vio: The vio we want to merge.
182 * @back_merge: Set to true for a back merge, false for a front merge.
183 *
184 * There are two types of merging possible, forward and backward, which are distinguished by a flag
185 * that uses kernel elevator terminology.
186 *
187 * Return: the vio to merge to, NULL if no merging is possible.
188 */
189static struct vio *get_mergeable_locked(struct int_map *map, struct vio *vio,
190					bool back_merge)
191{
192	struct bio *bio = vio->bio;
193	sector_t merge_sector = bio->bi_iter.bi_sector;
194	struct vio *vio_merge;
195
196	if (back_merge)
197		merge_sector -= VDO_SECTORS_PER_BLOCK;
198	else
199		merge_sector += VDO_SECTORS_PER_BLOCK;
200
201	vio_merge = vdo_int_map_get(map, merge_sector);
202
203	if (vio_merge == NULL)
204		return NULL;
205
206	if (vio->completion.priority != vio_merge->completion.priority)
207		return NULL;
208
209	if (bio_data_dir(bio) != bio_data_dir(vio_merge->bio))
210		return NULL;
211
212	if (bio_list_empty(&vio_merge->bios_merged))
213		return NULL;
214
215	if (back_merge) {
216		return (vio_merge->bios_merged.tail->bi_iter.bi_sector == merge_sector ?
217			vio_merge : NULL);
218	}
219
220	return (vio_merge->bios_merged.head->bi_iter.bi_sector == merge_sector ?
221		vio_merge : NULL);
222}
223
224static int map_merged_vio(struct int_map *bio_map, struct vio *vio)
225{
226	int result;
227	sector_t bio_sector;
228
229	bio_sector = vio->bios_merged.head->bi_iter.bi_sector;
230	result = vdo_int_map_put(bio_map, bio_sector, vio, true, NULL);
231	if (result != VDO_SUCCESS)
232		return result;
233
234	bio_sector = vio->bios_merged.tail->bi_iter.bi_sector;
235	return vdo_int_map_put(bio_map, bio_sector, vio, true, NULL);
236}
237
238static int merge_to_prev_tail(struct int_map *bio_map, struct vio *vio,
239			      struct vio *prev_vio)
240{
241	vdo_int_map_remove(bio_map, prev_vio->bios_merged.tail->bi_iter.bi_sector);
242	bio_list_merge(&prev_vio->bios_merged, &vio->bios_merged);
243	return map_merged_vio(bio_map, prev_vio);
244}
245
246static int merge_to_next_head(struct int_map *bio_map, struct vio *vio,
247			      struct vio *next_vio)
248{
249	/*
250	 * Handle "next merge" and "gap fill" cases the same way so as to reorder bios in a way
251	 * that's compatible with using funnel queues in work queues. This avoids removing an
252	 * existing completion.
253	 */
254	vdo_int_map_remove(bio_map, next_vio->bios_merged.head->bi_iter.bi_sector);
255	bio_list_merge_head(&next_vio->bios_merged, &vio->bios_merged);
256	return map_merged_vio(bio_map, next_vio);
257}
258
259/**
260 * try_bio_map_merge() - Attempt to merge a vio's bio with other pending I/Os.
261 * @vio: The vio to merge.
262 *
263 * Currently this is only used for data_vios, but is broken out for future use with metadata vios.
264 *
265 * Return: whether or not the vio was merged.
266 */
267static bool try_bio_map_merge(struct vio *vio)
268{
269	int result;
270	bool merged = true;
271	struct bio *bio = vio->bio;
272	struct vio *prev_vio, *next_vio;
273	struct vdo *vdo = vio->completion.vdo;
274	struct bio_queue_data *bio_queue_data =
275		&vdo->io_submitter->bio_queue_data[vio->bio_zone];
276
277	bio->bi_next = NULL;
278	bio_list_init(&vio->bios_merged);
279	bio_list_add(&vio->bios_merged, bio);
280
281	mutex_lock(&bio_queue_data->lock);
282	prev_vio = get_mergeable_locked(bio_queue_data->map, vio, true);
283	next_vio = get_mergeable_locked(bio_queue_data->map, vio, false);
284	if (prev_vio == next_vio)
285		next_vio = NULL;
286
287	if ((prev_vio == NULL) && (next_vio == NULL)) {
288		/* no merge. just add to bio_queue */
289		merged = false;
290		result = vdo_int_map_put(bio_queue_data->map,
291					 bio->bi_iter.bi_sector,
292					 vio, true, NULL);
293	} else if (next_vio == NULL) {
294		/* Only prev. merge to prev's tail */
295		result = merge_to_prev_tail(bio_queue_data->map, vio, prev_vio);
296	} else {
297		/* Only next. merge to next's head */
298		result = merge_to_next_head(bio_queue_data->map, vio, next_vio);
299	}
300	mutex_unlock(&bio_queue_data->lock);
301
302	/* We don't care about failure of int_map_put in this case. */
303	VDO_ASSERT_LOG_ONLY(result == VDO_SUCCESS, "bio map insertion succeeds");
304	return merged;
305}
306
307/**
308 * vdo_submit_data_vio() - Submit I/O for a data_vio.
309 * @data_vio: the data_vio for which to issue I/O.
310 *
311 * If possible, this I/O will be merged other pending I/Os. Otherwise, the data_vio will be sent to
312 * the appropriate bio zone directly.
313 */
314void vdo_submit_data_vio(struct data_vio *data_vio)
315{
316	if (try_bio_map_merge(&data_vio->vio))
317		return;
318
319	launch_data_vio_bio_zone_callback(data_vio, submit_data_vio);
320}
321
322/**
323 * __submit_metadata_vio() - Submit I/O for a metadata vio.
324 * @vio: the vio for which to issue I/O
325 * @physical: the physical block number to read or write
326 * @callback: the bio endio function which will be called after the I/O completes
327 * @error_handler: the handler for submission or I/O errors (may be NULL)
328 * @operation: the type of I/O to perform
329 * @data: the buffer to read or write (may be NULL)
330 *
331 * The vio is enqueued on a vdo bio queue so that bio submission (which may block) does not block
332 * other vdo threads.
333 *
334 * That the error handler will run on the correct thread is only true so long as the thread calling
335 * this function, and the thread set in the endio callback are the same, as well as the fact that
336 * no error can occur on the bio queue. Currently this is true for all callers, but additional care
337 * will be needed if this ever changes.
338 */
339void __submit_metadata_vio(struct vio *vio, physical_block_number_t physical,
340			   bio_end_io_t callback, vdo_action_fn error_handler,
341			   blk_opf_t operation, char *data)
342{
343	int result;
344	struct vdo_completion *completion = &vio->completion;
345	const struct admin_state_code *code = vdo_get_admin_state(completion->vdo);
346
347
348	VDO_ASSERT_LOG_ONLY(!code->quiescent, "I/O not allowed in state %s", code->name);
349	VDO_ASSERT_LOG_ONLY(vio->bio->bi_next == NULL, "metadata bio has no next bio");
350
351	vdo_reset_completion(completion);
352	completion->error_handler = error_handler;
353	result = vio_reset_bio(vio, data, callback, operation | REQ_META, physical);
354	if (result != VDO_SUCCESS) {
355		continue_vio(vio, result);
356		return;
357	}
358
359	vdo_set_completion_callback(completion, vdo_submit_vio,
360				    get_vio_bio_zone_thread_id(vio));
361	vdo_launch_completion_with_priority(completion, get_metadata_priority(vio));
362}
363
364/**
365 * vdo_make_io_submitter() - Create an io_submitter structure.
366 * @thread_count: Number of bio-submission threads to set up.
367 * @rotation_interval: Interval to use when rotating between bio-submission threads when enqueuing
368 *                     completions.
369 * @max_requests_active: Number of bios for merge tracking.
370 * @vdo: The vdo which will use this submitter.
371 * @io_submitter: pointer to the new data structure.
372 *
373 * Return: VDO_SUCCESS or an error.
374 */
375int vdo_make_io_submitter(unsigned int thread_count, unsigned int rotation_interval,
376			  unsigned int max_requests_active, struct vdo *vdo,
377			  struct io_submitter **io_submitter_ptr)
378{
379	unsigned int i;
380	struct io_submitter *io_submitter;
381	int result;
382
383	result = vdo_allocate_extended(struct io_submitter, thread_count,
384				       struct bio_queue_data, "bio submission data",
385				       &io_submitter);
386	if (result != VDO_SUCCESS)
387		return result;
388
389	io_submitter->bio_queue_rotation_interval = rotation_interval;
390
391	/* Setup for each bio-submission work queue */
392	for (i = 0; i < thread_count; i++) {
393		struct bio_queue_data *bio_queue_data = &io_submitter->bio_queue_data[i];
394
395		mutex_init(&bio_queue_data->lock);
396		/*
397		 * One I/O operation per request, but both first & last sector numbers.
398		 *
399		 * If requests are assigned to threads round-robin, they should be distributed
400		 * quite evenly. But if they're assigned based on PBN, things can sometimes be very
401		 * uneven. So for now, we'll assume that all requests *may* wind up on one thread,
402		 * and thus all in the same map.
403		 */
404		result = vdo_int_map_create(max_requests_active * 2,
405					    &bio_queue_data->map);
406		if (result != VDO_SUCCESS) {
407			/*
408			 * Clean up the partially initialized bio-queue entirely and indicate that
409			 * initialization failed.
410			 */
411			vdo_log_error("bio map initialization failed %d", result);
412			vdo_cleanup_io_submitter(io_submitter);
413			vdo_free_io_submitter(io_submitter);
414			return result;
415		}
416
417		bio_queue_data->queue_number = i;
418		result = vdo_make_thread(vdo, vdo->thread_config.bio_threads[i],
419					 &bio_queue_type, 1, (void **) &bio_queue_data);
420		if (result != VDO_SUCCESS) {
421			/*
422			 * Clean up the partially initialized bio-queue entirely and indicate that
423			 * initialization failed.
424			 */
425			vdo_int_map_free(vdo_forget(bio_queue_data->map));
426			vdo_log_error("bio queue initialization failed %d", result);
427			vdo_cleanup_io_submitter(io_submitter);
428			vdo_free_io_submitter(io_submitter);
429			return result;
430		}
431
432		bio_queue_data->queue = vdo->threads[vdo->thread_config.bio_threads[i]].queue;
433		io_submitter->num_bio_queues_used++;
434	}
435
436	*io_submitter_ptr = io_submitter;
437
438	return VDO_SUCCESS;
439}
440
441/**
442 * vdo_cleanup_io_submitter() - Tear down the io_submitter fields as needed for a physical layer.
443 * @io_submitter: The I/O submitter data to tear down (may be NULL).
444 */
445void vdo_cleanup_io_submitter(struct io_submitter *io_submitter)
446{
447	int i;
448
449	if (io_submitter == NULL)
450		return;
451
452	for (i = io_submitter->num_bio_queues_used - 1; i >= 0; i--)
453		vdo_finish_work_queue(io_submitter->bio_queue_data[i].queue);
454}
455
456/**
457 * vdo_free_io_submitter() - Free the io_submitter fields and structure as needed.
458 * @io_submitter: The I/O submitter data to destroy.
459 *
460 * This must be called after vdo_cleanup_io_submitter(). It is used to release resources late in
461 * the shutdown process to avoid or reduce the chance of race conditions.
462 */
463void vdo_free_io_submitter(struct io_submitter *io_submitter)
464{
465	int i;
466
467	if (io_submitter == NULL)
468		return;
469
470	for (i = io_submitter->num_bio_queues_used - 1; i >= 0; i--) {
471		io_submitter->num_bio_queues_used--;
472		/* vdo_destroy() will free the work queue, so just give up our reference to it. */
473		vdo_forget(io_submitter->bio_queue_data[i].queue);
474		vdo_int_map_free(vdo_forget(io_submitter->bio_queue_data[i].map));
475	}
476	vdo_free(io_submitter);
477}
478