1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5
6#include "data-vio.h"
7
8#include <linux/atomic.h>
9#include <linux/bio.h>
10#include <linux/blkdev.h>
11#include <linux/delay.h>
12#include <linux/device-mapper.h>
13#include <linux/jiffies.h>
14#include <linux/kernel.h>
15#include <linux/list.h>
16#include <linux/lz4.h>
17#include <linux/minmax.h>
18#include <linux/sched.h>
19#include <linux/spinlock.h>
20#include <linux/wait.h>
21
22#include "logger.h"
23#include "memory-alloc.h"
24#include "murmurhash3.h"
25#include "permassert.h"
26
27#include "block-map.h"
28#include "dump.h"
29#include "encodings.h"
30#include "int-map.h"
31#include "io-submitter.h"
32#include "logical-zone.h"
33#include "packer.h"
34#include "recovery-journal.h"
35#include "slab-depot.h"
36#include "status-codes.h"
37#include "types.h"
38#include "vdo.h"
39#include "vio.h"
40#include "wait-queue.h"
41
42/**
43 * DOC: Bio flags.
44 *
45 * For certain flags set on user bios, if the user bio has not yet been acknowledged, setting those
46 * flags on our own bio(s) for that request may help underlying layers better fulfill the user
47 * bio's needs. This constant contains the aggregate of those flags; VDO strips all the other
48 * flags, as they convey incorrect information.
49 *
50 * These flags are always irrelevant if we have already finished the user bio as they are only
51 * hints on IO importance. If VDO has finished the user bio, any remaining IO done doesn't care how
52 * important finishing the finished bio was.
53 *
54 * Note that bio.c contains the complete list of flags we believe may be set; the following list
55 * explains the action taken with each of those flags VDO could receive:
56 *
57 * * REQ_SYNC: Passed down if the user bio is not yet completed, since it indicates the user bio
58 *   completion is required for further work to be done by the issuer.
59 * * REQ_META: Passed down if the user bio is not yet completed, since it may mean the lower layer
60 *   treats it as more urgent, similar to REQ_SYNC.
61 * * REQ_PRIO: Passed down if the user bio is not yet completed, since it indicates the user bio is
62 *   important.
63 * * REQ_NOMERGE: Set only if the incoming bio was split; irrelevant to VDO IO.
64 * * REQ_IDLE: Set if the incoming bio had more IO quickly following; VDO's IO pattern doesn't
65 *   match incoming IO, so this flag is incorrect for it.
66 * * REQ_FUA: Handled separately, and irrelevant to VDO IO otherwise.
67 * * REQ_RAHEAD: Passed down, as, for reads, it indicates trivial importance.
68 * * REQ_BACKGROUND: Not passed down, as VIOs are a limited resource and VDO needs them recycled
69 *   ASAP to service heavy load, which is the only place where REQ_BACKGROUND might aid in load
70 *   prioritization.
71 */
72static blk_opf_t PASSTHROUGH_FLAGS = (REQ_PRIO | REQ_META | REQ_SYNC | REQ_RAHEAD);
73
74/**
75 * DOC:
76 *
77 * The data_vio_pool maintains the pool of data_vios which a vdo uses to service incoming bios. For
78 * correctness, and in order to avoid potentially expensive or blocking memory allocations during
79 * normal operation, the number of concurrently active data_vios is capped. Furthermore, in order
80 * to avoid starvation of reads and writes, at most 75% of the data_vios may be used for
81 * discards. The data_vio_pool is responsible for enforcing these limits. Threads submitting bios
82 * for which a data_vio or discard permit are not available will block until the necessary
83 * resources are available. The pool is also responsible for distributing resources to blocked
84 * threads and waking them. Finally, the pool attempts to batch the work of recycling data_vios by
85 * performing the work of actually assigning resources to blocked threads or placing data_vios back
86 * into the pool on a single cpu at a time.
87 *
88 * The pool contains two "limiters", one for tracking data_vios and one for tracking discard
89 * permits. The limiters also provide safe cross-thread access to pool statistics without the need
90 * to take the pool's lock. When a thread submits a bio to a vdo device, it will first attempt to
91 * get a discard permit if it is a discard, and then to get a data_vio. If the necessary resources
92 * are available, the incoming bio will be assigned to the acquired data_vio, and it will be
93 * launched. However, if either of these are unavailable, the arrival time of the bio is recorded
94 * in the bio's bi_private field, the bio and its submitter are both queued on the appropriate
95 * limiter and the submitting thread will then put itself to sleep. (note that this mechanism will
96 * break if jiffies are only 32 bits.)
97 *
98 * Whenever a data_vio has completed processing for the bio it was servicing, release_data_vio()
99 * will be called on it. This function will add the data_vio to a funnel queue, and then check the
100 * state of the pool. If the pool is not currently processing released data_vios, the pool's
101 * completion will be enqueued on a cpu queue. This obviates the need for the releasing threads to
102 * hold the pool's lock, and also batches release work while avoiding starvation of the cpu
103 * threads.
104 *
105 * Whenever the pool's completion is run on a cpu thread, it calls process_release_callback() which
106 * processes a batch of returned data_vios (currently at most 32) from the pool's funnel queue. For
107 * each data_vio, it first checks whether that data_vio was processing a discard. If so, and there
108 * is a blocked bio waiting for a discard permit, that permit is notionally transferred to the
109 * eldest discard waiter, and that waiter is moved to the end of the list of discard bios waiting
110 * for a data_vio. If there are no discard waiters, the discard permit is returned to the pool.
111 * Next, the data_vio is assigned to the oldest blocked bio which either has a discard permit, or
112 * doesn't need one and relaunched. If neither of these exist, the data_vio is returned to the
113 * pool. Finally, if any waiting bios were launched, the threads which blocked trying to submit
114 * them are awakened.
115 */
116
117#define DATA_VIO_RELEASE_BATCH_SIZE 128
118
119static const unsigned int VDO_SECTORS_PER_BLOCK_MASK = VDO_SECTORS_PER_BLOCK - 1;
120static const u32 COMPRESSION_STATUS_MASK = 0xff;
121static const u32 MAY_NOT_COMPRESS_MASK = 0x80000000;
122
123struct limiter;
124typedef void (*assigner_fn)(struct limiter *limiter);
125
126/* Bookkeeping structure for a single type of resource. */
127struct limiter {
128	/* The data_vio_pool to which this limiter belongs */
129	struct data_vio_pool *pool;
130	/* The maximum number of data_vios available */
131	data_vio_count_t limit;
132	/* The number of resources in use */
133	data_vio_count_t busy;
134	/* The maximum number of resources ever simultaneously in use */
135	data_vio_count_t max_busy;
136	/* The number of resources to release */
137	data_vio_count_t release_count;
138	/* The number of waiters to wake */
139	data_vio_count_t wake_count;
140	/* The list of waiting bios which are known to process_release_callback() */
141	struct bio_list waiters;
142	/* The list of waiting bios which are not yet known to process_release_callback() */
143	struct bio_list new_waiters;
144	/* The list of waiters which have their permits */
145	struct bio_list *permitted_waiters;
146	/* The function for assigning a resource to a waiter */
147	assigner_fn assigner;
148	/* The queue of blocked threads */
149	wait_queue_head_t blocked_threads;
150	/* The arrival time of the eldest waiter */
151	u64 arrival;
152};
153
154/*
155 * A data_vio_pool is a collection of preallocated data_vios which may be acquired from any thread,
156 * and are released in batches.
157 */
158struct data_vio_pool {
159	/* Completion for scheduling releases */
160	struct vdo_completion completion;
161	/* The administrative state of the pool */
162	struct admin_state state;
163	/* Lock protecting the pool */
164	spinlock_t lock;
165	/* The main limiter controlling the total data_vios in the pool. */
166	struct limiter limiter;
167	/* The limiter controlling data_vios for discard */
168	struct limiter discard_limiter;
169	/* The list of bios which have discard permits but still need a data_vio */
170	struct bio_list permitted_discards;
171	/* The list of available data_vios */
172	struct list_head available;
173	/* The queue of data_vios waiting to be returned to the pool */
174	struct funnel_queue *queue;
175	/* Whether the pool is processing, or scheduled to process releases */
176	atomic_t processing;
177	/* The data vios in the pool */
178	struct data_vio data_vios[];
179};
180
181static const char * const ASYNC_OPERATION_NAMES[] = {
182	"launch",
183	"acknowledge_write",
184	"acquire_hash_lock",
185	"attempt_logical_block_lock",
186	"lock_duplicate_pbn",
187	"check_for_duplication",
188	"cleanup",
189	"compress_data_vio",
190	"find_block_map_slot",
191	"get_mapped_block_for_read",
192	"get_mapped_block_for_write",
193	"hash_data_vio",
194	"journal_remapping",
195	"vdo_attempt_packing",
196	"put_mapped_block",
197	"read_data_vio",
198	"update_dedupe_index",
199	"update_reference_counts",
200	"verify_duplication",
201	"write_data_vio",
202};
203
204/* The steps taken cleaning up a VIO, in the order they are performed. */
205enum data_vio_cleanup_stage {
206	VIO_CLEANUP_START,
207	VIO_RELEASE_HASH_LOCK = VIO_CLEANUP_START,
208	VIO_RELEASE_ALLOCATED,
209	VIO_RELEASE_RECOVERY_LOCKS,
210	VIO_RELEASE_LOGICAL,
211	VIO_CLEANUP_DONE
212};
213
214static inline struct data_vio_pool * __must_check
215as_data_vio_pool(struct vdo_completion *completion)
216{
217	vdo_assert_completion_type(completion, VDO_DATA_VIO_POOL_COMPLETION);
218	return container_of(completion, struct data_vio_pool, completion);
219}
220
221static inline u64 get_arrival_time(struct bio *bio)
222{
223	return (u64) bio->bi_private;
224}
225
226/**
227 * check_for_drain_complete_locked() - Check whether a data_vio_pool has no outstanding data_vios
228 *				       or waiters while holding the pool's lock.
229 */
230static bool check_for_drain_complete_locked(struct data_vio_pool *pool)
231{
232	if (pool->limiter.busy > 0)
233		return false;
234
235	VDO_ASSERT_LOG_ONLY((pool->discard_limiter.busy == 0),
236			    "no outstanding discard permits");
237
238	return (bio_list_empty(&pool->limiter.new_waiters) &&
239		bio_list_empty(&pool->discard_limiter.new_waiters));
240}
241
242static void initialize_lbn_lock(struct data_vio *data_vio, logical_block_number_t lbn)
243{
244	struct vdo *vdo = vdo_from_data_vio(data_vio);
245	zone_count_t zone_number;
246	struct lbn_lock *lock = &data_vio->logical;
247
248	lock->lbn = lbn;
249	lock->locked = false;
250	vdo_waitq_init(&lock->waiters);
251	zone_number = vdo_compute_logical_zone(data_vio);
252	lock->zone = &vdo->logical_zones->zones[zone_number];
253}
254
255static void launch_locked_request(struct data_vio *data_vio)
256{
257	data_vio->logical.locked = true;
258	if (data_vio->write) {
259		struct vdo *vdo = vdo_from_data_vio(data_vio);
260
261		if (vdo_is_read_only(vdo)) {
262			continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
263			return;
264		}
265	}
266
267	data_vio->last_async_operation = VIO_ASYNC_OP_FIND_BLOCK_MAP_SLOT;
268	vdo_find_block_map_slot(data_vio);
269}
270
271static void acknowledge_data_vio(struct data_vio *data_vio)
272{
273	struct vdo *vdo = vdo_from_data_vio(data_vio);
274	struct bio *bio = data_vio->user_bio;
275	int error = vdo_status_to_errno(data_vio->vio.completion.result);
276
277	if (bio == NULL)
278		return;
279
280	VDO_ASSERT_LOG_ONLY((data_vio->remaining_discard <=
281			     (u32) (VDO_BLOCK_SIZE - data_vio->offset)),
282			    "data_vio to acknowledge is not an incomplete discard");
283
284	data_vio->user_bio = NULL;
285	vdo_count_bios(&vdo->stats.bios_acknowledged, bio);
286	if (data_vio->is_partial)
287		vdo_count_bios(&vdo->stats.bios_acknowledged_partial, bio);
288
289	bio->bi_status = errno_to_blk_status(error);
290	bio_endio(bio);
291}
292
293static void copy_to_bio(struct bio *bio, char *data_ptr)
294{
295	struct bio_vec biovec;
296	struct bvec_iter iter;
297
298	bio_for_each_segment(biovec, bio, iter) {
299		memcpy_to_bvec(&biovec, data_ptr);
300		data_ptr += biovec.bv_len;
301	}
302}
303
304struct data_vio_compression_status get_data_vio_compression_status(struct data_vio *data_vio)
305{
306	u32 packed = atomic_read(&data_vio->compression.status);
307
308	/* pairs with cmpxchg in set_data_vio_compression_status */
309	smp_rmb();
310	return (struct data_vio_compression_status) {
311		.stage = packed & COMPRESSION_STATUS_MASK,
312		.may_not_compress = ((packed & MAY_NOT_COMPRESS_MASK) != 0),
313	};
314}
315
316/**
317 * pack_status() - Convert a data_vio_compression_status into a u32 which may be stored
318 *                 atomically.
319 * @status: The state to convert.
320 *
321 * Return: The compression state packed into a u32.
322 */
323static u32 __must_check pack_status(struct data_vio_compression_status status)
324{
325	return status.stage | (status.may_not_compress ? MAY_NOT_COMPRESS_MASK : 0);
326}
327
328/**
329 * set_data_vio_compression_status() - Set the compression status of a data_vio.
330 * @state: The expected current status of the data_vio.
331 * @new_state: The status to set.
332 *
333 * Return: true if the new status was set, false if the data_vio's compression status did not
334 *         match the expected state, and so was left unchanged.
335 */
336static bool __must_check
337set_data_vio_compression_status(struct data_vio *data_vio,
338				struct data_vio_compression_status status,
339				struct data_vio_compression_status new_status)
340{
341	u32 actual;
342	u32 expected = pack_status(status);
343	u32 replacement = pack_status(new_status);
344
345	/*
346	 * Extra barriers because this was original developed using a CAS operation that implicitly
347	 * had them.
348	 */
349	smp_mb__before_atomic();
350	actual = atomic_cmpxchg(&data_vio->compression.status, expected, replacement);
351	/* same as before_atomic */
352	smp_mb__after_atomic();
353	return (expected == actual);
354}
355
356struct data_vio_compression_status advance_data_vio_compression_stage(struct data_vio *data_vio)
357{
358	for (;;) {
359		struct data_vio_compression_status status =
360			get_data_vio_compression_status(data_vio);
361		struct data_vio_compression_status new_status = status;
362
363		if (status.stage == DATA_VIO_POST_PACKER) {
364			/* We're already in the last stage. */
365			return status;
366		}
367
368		if (status.may_not_compress) {
369			/*
370			 * Compression has been dis-allowed for this VIO, so skip the rest of the
371			 * path and go to the end.
372			 */
373			new_status.stage = DATA_VIO_POST_PACKER;
374		} else {
375			/* Go to the next state. */
376			new_status.stage++;
377		}
378
379		if (set_data_vio_compression_status(data_vio, status, new_status))
380			return new_status;
381
382		/* Another thread changed the status out from under us so try again. */
383	}
384}
385
386/**
387 * cancel_data_vio_compression() - Prevent this data_vio from being compressed or packed.
388 *
389 * Return: true if the data_vio is in the packer and the caller was the first caller to cancel it.
390 */
391bool cancel_data_vio_compression(struct data_vio *data_vio)
392{
393	struct data_vio_compression_status status, new_status;
394
395	for (;;) {
396		status = get_data_vio_compression_status(data_vio);
397		if (status.may_not_compress || (status.stage == DATA_VIO_POST_PACKER)) {
398			/* This data_vio is already set up to not block in the packer. */
399			break;
400		}
401
402		new_status.stage = status.stage;
403		new_status.may_not_compress = true;
404
405		if (set_data_vio_compression_status(data_vio, status, new_status))
406			break;
407	}
408
409	return ((status.stage == DATA_VIO_PACKING) && !status.may_not_compress);
410}
411
412/**
413 * attempt_logical_block_lock() - Attempt to acquire the lock on a logical block.
414 * @completion: The data_vio for an external data request as a completion.
415 *
416 * This is the start of the path for all external requests. It is registered in launch_data_vio().
417 */
418static void attempt_logical_block_lock(struct vdo_completion *completion)
419{
420	struct data_vio *data_vio = as_data_vio(completion);
421	struct lbn_lock *lock = &data_vio->logical;
422	struct vdo *vdo = vdo_from_data_vio(data_vio);
423	struct data_vio *lock_holder;
424	int result;
425
426	assert_data_vio_in_logical_zone(data_vio);
427
428	if (data_vio->logical.lbn >= vdo->states.vdo.config.logical_blocks) {
429		continue_data_vio_with_error(data_vio, VDO_OUT_OF_RANGE);
430		return;
431	}
432
433	result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn,
434				 data_vio, false, (void **) &lock_holder);
435	if (result != VDO_SUCCESS) {
436		continue_data_vio_with_error(data_vio, result);
437		return;
438	}
439
440	if (lock_holder == NULL) {
441		/* We got the lock */
442		launch_locked_request(data_vio);
443		return;
444	}
445
446	result = VDO_ASSERT(lock_holder->logical.locked, "logical block lock held");
447	if (result != VDO_SUCCESS) {
448		continue_data_vio_with_error(data_vio, result);
449		return;
450	}
451
452	/*
453	 * If the new request is a pure read request (not read-modify-write) and the lock_holder is
454	 * writing and has received an allocation, service the read request immediately by copying
455	 * data from the lock_holder to avoid having to flush the write out of the packer just to
456	 * prevent the read from waiting indefinitely. If the lock_holder does not yet have an
457	 * allocation, prevent it from blocking in the packer and wait on it. This is necessary in
458	 * order to prevent returning data that may not have actually been written.
459	 */
460	if (!data_vio->write && READ_ONCE(lock_holder->allocation_succeeded)) {
461		copy_to_bio(data_vio->user_bio, lock_holder->vio.data + data_vio->offset);
462		acknowledge_data_vio(data_vio);
463		complete_data_vio(completion);
464		return;
465	}
466
467	data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_LOGICAL_BLOCK_LOCK;
468	vdo_waitq_enqueue_waiter(&lock_holder->logical.waiters, &data_vio->waiter);
469
470	/*
471	 * Prevent writes and read-modify-writes from blocking indefinitely on lock holders in the
472	 * packer.
473	 */
474	if (lock_holder->write && cancel_data_vio_compression(lock_holder)) {
475		data_vio->compression.lock_holder = lock_holder;
476		launch_data_vio_packer_callback(data_vio,
477						vdo_remove_lock_holder_from_packer);
478	}
479}
480
481/**
482 * launch_data_vio() - (Re)initialize a data_vio to have a new logical block number, keeping the
483 *		       same parent and other state and send it on its way.
484 */
485static void launch_data_vio(struct data_vio *data_vio, logical_block_number_t lbn)
486{
487	struct vdo_completion *completion = &data_vio->vio.completion;
488
489	/*
490	 * Clearing the tree lock must happen before initializing the LBN lock, which also adds
491	 * information to the tree lock.
492	 */
493	memset(&data_vio->tree_lock, 0, sizeof(data_vio->tree_lock));
494	initialize_lbn_lock(data_vio, lbn);
495	INIT_LIST_HEAD(&data_vio->hash_lock_entry);
496	INIT_LIST_HEAD(&data_vio->write_entry);
497
498	memset(&data_vio->allocation, 0, sizeof(data_vio->allocation));
499
500	data_vio->is_duplicate = false;
501
502	memset(&data_vio->record_name, 0, sizeof(data_vio->record_name));
503	memset(&data_vio->duplicate, 0, sizeof(data_vio->duplicate));
504	vdo_reset_completion(completion);
505	completion->error_handler = handle_data_vio_error;
506	set_data_vio_logical_callback(data_vio, attempt_logical_block_lock);
507	vdo_enqueue_completion(completion, VDO_DEFAULT_Q_MAP_BIO_PRIORITY);
508}
509
510static bool is_zero_block(char *block)
511{
512	int i;
513
514	for (i = 0; i < VDO_BLOCK_SIZE; i += sizeof(u64)) {
515		if (*((u64 *) &block[i]))
516			return false;
517	}
518
519	return true;
520}
521
522static void copy_from_bio(struct bio *bio, char *data_ptr)
523{
524	struct bio_vec biovec;
525	struct bvec_iter iter;
526
527	bio_for_each_segment(biovec, bio, iter) {
528		memcpy_from_bvec(data_ptr, &biovec);
529		data_ptr += biovec.bv_len;
530	}
531}
532
533static void launch_bio(struct vdo *vdo, struct data_vio *data_vio, struct bio *bio)
534{
535	logical_block_number_t lbn;
536	/*
537	 * Zero out the fields which don't need to be preserved (i.e. which are not pointers to
538	 * separately allocated objects).
539	 */
540	memset(data_vio, 0, offsetof(struct data_vio, vio));
541	memset(&data_vio->compression, 0, offsetof(struct compression_state, block));
542
543	data_vio->user_bio = bio;
544	data_vio->offset = to_bytes(bio->bi_iter.bi_sector & VDO_SECTORS_PER_BLOCK_MASK);
545	data_vio->is_partial = (bio->bi_iter.bi_size < VDO_BLOCK_SIZE) || (data_vio->offset != 0);
546
547	/*
548	 * Discards behave very differently than other requests when coming in from device-mapper.
549	 * We have to be able to handle any size discards and various sector offsets within a
550	 * block.
551	 */
552	if (bio_op(bio) == REQ_OP_DISCARD) {
553		data_vio->remaining_discard = bio->bi_iter.bi_size;
554		data_vio->write = true;
555		data_vio->is_discard = true;
556		if (data_vio->is_partial) {
557			vdo_count_bios(&vdo->stats.bios_in_partial, bio);
558			data_vio->read = true;
559		}
560	} else if (data_vio->is_partial) {
561		vdo_count_bios(&vdo->stats.bios_in_partial, bio);
562		data_vio->read = true;
563		if (bio_data_dir(bio) == WRITE)
564			data_vio->write = true;
565	} else if (bio_data_dir(bio) == READ) {
566		data_vio->read = true;
567	} else {
568		/*
569		 * Copy the bio data to a char array so that we can continue to use the data after
570		 * we acknowledge the bio.
571		 */
572		copy_from_bio(bio, data_vio->vio.data);
573		data_vio->is_zero = is_zero_block(data_vio->vio.data);
574		data_vio->write = true;
575	}
576
577	if (data_vio->user_bio->bi_opf & REQ_FUA)
578		data_vio->fua = true;
579
580	lbn = (bio->bi_iter.bi_sector - vdo->starting_sector_offset) / VDO_SECTORS_PER_BLOCK;
581	launch_data_vio(data_vio, lbn);
582}
583
584static void assign_data_vio(struct limiter *limiter, struct data_vio *data_vio)
585{
586	struct bio *bio = bio_list_pop(limiter->permitted_waiters);
587
588	launch_bio(limiter->pool->completion.vdo, data_vio, bio);
589	limiter->wake_count++;
590
591	bio = bio_list_peek(limiter->permitted_waiters);
592	limiter->arrival = ((bio == NULL) ? U64_MAX : get_arrival_time(bio));
593}
594
595static void assign_discard_permit(struct limiter *limiter)
596{
597	struct bio *bio = bio_list_pop(&limiter->waiters);
598
599	if (limiter->arrival == U64_MAX)
600		limiter->arrival = get_arrival_time(bio);
601
602	bio_list_add(limiter->permitted_waiters, bio);
603}
604
605static void get_waiters(struct limiter *limiter)
606{
607	bio_list_merge(&limiter->waiters, &limiter->new_waiters);
608	bio_list_init(&limiter->new_waiters);
609}
610
611static inline struct data_vio *get_available_data_vio(struct data_vio_pool *pool)
612{
613	struct data_vio *data_vio =
614		list_first_entry(&pool->available, struct data_vio, pool_entry);
615
616	list_del_init(&data_vio->pool_entry);
617	return data_vio;
618}
619
620static void assign_data_vio_to_waiter(struct limiter *limiter)
621{
622	assign_data_vio(limiter, get_available_data_vio(limiter->pool));
623}
624
625static void update_limiter(struct limiter *limiter)
626{
627	struct bio_list *waiters = &limiter->waiters;
628	data_vio_count_t available = limiter->limit - limiter->busy;
629
630	VDO_ASSERT_LOG_ONLY((limiter->release_count <= limiter->busy),
631			    "Release count %u is not more than busy count %u",
632			    limiter->release_count, limiter->busy);
633
634	get_waiters(limiter);
635	for (; (limiter->release_count > 0) && !bio_list_empty(waiters); limiter->release_count--)
636		limiter->assigner(limiter);
637
638	if (limiter->release_count > 0) {
639		WRITE_ONCE(limiter->busy, limiter->busy - limiter->release_count);
640		limiter->release_count = 0;
641		return;
642	}
643
644	for (; (available > 0) && !bio_list_empty(waiters); available--)
645		limiter->assigner(limiter);
646
647	WRITE_ONCE(limiter->busy, limiter->limit - available);
648	if (limiter->max_busy < limiter->busy)
649		WRITE_ONCE(limiter->max_busy, limiter->busy);
650}
651
652/**
653 * schedule_releases() - Ensure that release processing is scheduled.
654 *
655 * If this call switches the state to processing, enqueue. Otherwise, some other thread has already
656 * done so.
657 */
658static void schedule_releases(struct data_vio_pool *pool)
659{
660	/* Pairs with the barrier in process_release_callback(). */
661	smp_mb__before_atomic();
662	if (atomic_cmpxchg(&pool->processing, false, true))
663		return;
664
665	pool->completion.requeue = true;
666	vdo_launch_completion_with_priority(&pool->completion,
667					    CPU_Q_COMPLETE_VIO_PRIORITY);
668}
669
670static void reuse_or_release_resources(struct data_vio_pool *pool,
671				       struct data_vio *data_vio,
672				       struct list_head *returned)
673{
674	if (data_vio->remaining_discard > 0) {
675		if (bio_list_empty(&pool->discard_limiter.waiters)) {
676			/* Return the data_vio's discard permit. */
677			pool->discard_limiter.release_count++;
678		} else {
679			assign_discard_permit(&pool->discard_limiter);
680		}
681	}
682
683	if (pool->limiter.arrival < pool->discard_limiter.arrival) {
684		assign_data_vio(&pool->limiter, data_vio);
685	} else if (pool->discard_limiter.arrival < U64_MAX) {
686		assign_data_vio(&pool->discard_limiter, data_vio);
687	} else {
688		list_add(&data_vio->pool_entry, returned);
689		pool->limiter.release_count++;
690	}
691}
692
693/**
694 * process_release_callback() - Process a batch of data_vio releases.
695 * @completion: The pool with data_vios to release.
696 */
697static void process_release_callback(struct vdo_completion *completion)
698{
699	struct data_vio_pool *pool = as_data_vio_pool(completion);
700	bool reschedule;
701	bool drained;
702	data_vio_count_t processed;
703	data_vio_count_t to_wake;
704	data_vio_count_t discards_to_wake;
705	LIST_HEAD(returned);
706
707	spin_lock(&pool->lock);
708	get_waiters(&pool->discard_limiter);
709	get_waiters(&pool->limiter);
710	spin_unlock(&pool->lock);
711
712	if (pool->limiter.arrival == U64_MAX) {
713		struct bio *bio = bio_list_peek(&pool->limiter.waiters);
714
715		if (bio != NULL)
716			pool->limiter.arrival = get_arrival_time(bio);
717	}
718
719	for (processed = 0; processed < DATA_VIO_RELEASE_BATCH_SIZE; processed++) {
720		struct data_vio *data_vio;
721		struct funnel_queue_entry *entry = vdo_funnel_queue_poll(pool->queue);
722
723		if (entry == NULL)
724			break;
725
726		data_vio = as_data_vio(container_of(entry, struct vdo_completion,
727						    work_queue_entry_link));
728		acknowledge_data_vio(data_vio);
729		reuse_or_release_resources(pool, data_vio, &returned);
730	}
731
732	spin_lock(&pool->lock);
733	/*
734	 * There is a race where waiters could be added while we are in the unlocked section above.
735	 * Those waiters could not see the resources we are now about to release, so we assign
736	 * those resources now as we have no guarantee of being rescheduled. This is handled in
737	 * update_limiter().
738	 */
739	update_limiter(&pool->discard_limiter);
740	list_splice(&returned, &pool->available);
741	update_limiter(&pool->limiter);
742	to_wake = pool->limiter.wake_count;
743	pool->limiter.wake_count = 0;
744	discards_to_wake = pool->discard_limiter.wake_count;
745	pool->discard_limiter.wake_count = 0;
746
747	atomic_set(&pool->processing, false);
748	/* Pairs with the barrier in schedule_releases(). */
749	smp_mb();
750
751	reschedule = !vdo_is_funnel_queue_empty(pool->queue);
752	drained = (!reschedule &&
753		   vdo_is_state_draining(&pool->state) &&
754		   check_for_drain_complete_locked(pool));
755	spin_unlock(&pool->lock);
756
757	if (to_wake > 0)
758		wake_up_nr(&pool->limiter.blocked_threads, to_wake);
759
760	if (discards_to_wake > 0)
761		wake_up_nr(&pool->discard_limiter.blocked_threads, discards_to_wake);
762
763	if (reschedule)
764		schedule_releases(pool);
765	else if (drained)
766		vdo_finish_draining(&pool->state);
767}
768
769static void initialize_limiter(struct limiter *limiter, struct data_vio_pool *pool,
770			       assigner_fn assigner, data_vio_count_t limit)
771{
772	limiter->pool = pool;
773	limiter->assigner = assigner;
774	limiter->limit = limit;
775	limiter->arrival = U64_MAX;
776	init_waitqueue_head(&limiter->blocked_threads);
777}
778
779/**
780 * initialize_data_vio() - Allocate the components of a data_vio.
781 *
782 * The caller is responsible for cleaning up the data_vio on error.
783 *
784 * Return: VDO_SUCCESS or an error.
785 */
786static int initialize_data_vio(struct data_vio *data_vio, struct vdo *vdo)
787{
788	struct bio *bio;
789	int result;
790
791	BUILD_BUG_ON(VDO_BLOCK_SIZE > PAGE_SIZE);
792	result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "data_vio data",
793				     &data_vio->vio.data);
794	if (result != VDO_SUCCESS)
795		return vdo_log_error_strerror(result,
796					      "data_vio data allocation failure");
797
798	result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "compressed block",
799				     &data_vio->compression.block);
800	if (result != VDO_SUCCESS) {
801		return vdo_log_error_strerror(result,
802					      "data_vio compressed block allocation failure");
803	}
804
805	result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "vio scratch",
806				     &data_vio->scratch_block);
807	if (result != VDO_SUCCESS)
808		return vdo_log_error_strerror(result,
809					      "data_vio scratch allocation failure");
810
811	result = vdo_create_bio(&bio);
812	if (result != VDO_SUCCESS)
813		return vdo_log_error_strerror(result,
814					      "data_vio data bio allocation failure");
815
816	vdo_initialize_completion(&data_vio->decrement_completion, vdo,
817				  VDO_DECREMENT_COMPLETION);
818	initialize_vio(&data_vio->vio, bio, 1, VIO_TYPE_DATA, VIO_PRIORITY_DATA, vdo);
819
820	return VDO_SUCCESS;
821}
822
823static void destroy_data_vio(struct data_vio *data_vio)
824{
825	if (data_vio == NULL)
826		return;
827
828	vdo_free_bio(vdo_forget(data_vio->vio.bio));
829	vdo_free(vdo_forget(data_vio->vio.data));
830	vdo_free(vdo_forget(data_vio->compression.block));
831	vdo_free(vdo_forget(data_vio->scratch_block));
832}
833
834/**
835 * make_data_vio_pool() - Initialize a data_vio pool.
836 * @vdo: The vdo to which the pool will belong.
837 * @pool_size: The number of data_vios in the pool.
838 * @discard_limit: The maximum number of data_vios which may be used for discards.
839 * @pool: A pointer to hold the newly allocated pool.
840 */
841int make_data_vio_pool(struct vdo *vdo, data_vio_count_t pool_size,
842		       data_vio_count_t discard_limit, struct data_vio_pool **pool_ptr)
843{
844	int result;
845	struct data_vio_pool *pool;
846	data_vio_count_t i;
847
848	result = vdo_allocate_extended(struct data_vio_pool, pool_size, struct data_vio,
849				       __func__, &pool);
850	if (result != VDO_SUCCESS)
851		return result;
852
853	VDO_ASSERT_LOG_ONLY((discard_limit <= pool_size),
854			    "discard limit does not exceed pool size");
855	initialize_limiter(&pool->discard_limiter, pool, assign_discard_permit,
856			   discard_limit);
857	pool->discard_limiter.permitted_waiters = &pool->permitted_discards;
858	initialize_limiter(&pool->limiter, pool, assign_data_vio_to_waiter, pool_size);
859	pool->limiter.permitted_waiters = &pool->limiter.waiters;
860	INIT_LIST_HEAD(&pool->available);
861	spin_lock_init(&pool->lock);
862	vdo_set_admin_state_code(&pool->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
863	vdo_initialize_completion(&pool->completion, vdo, VDO_DATA_VIO_POOL_COMPLETION);
864	vdo_prepare_completion(&pool->completion, process_release_callback,
865			       process_release_callback, vdo->thread_config.cpu_thread,
866			       NULL);
867
868	result = vdo_make_funnel_queue(&pool->queue);
869	if (result != VDO_SUCCESS) {
870		free_data_vio_pool(vdo_forget(pool));
871		return result;
872	}
873
874	for (i = 0; i < pool_size; i++) {
875		struct data_vio *data_vio = &pool->data_vios[i];
876
877		result = initialize_data_vio(data_vio, vdo);
878		if (result != VDO_SUCCESS) {
879			destroy_data_vio(data_vio);
880			free_data_vio_pool(pool);
881			return result;
882		}
883
884		list_add(&data_vio->pool_entry, &pool->available);
885	}
886
887	*pool_ptr = pool;
888	return VDO_SUCCESS;
889}
890
891/**
892 * free_data_vio_pool() - Free a data_vio_pool and the data_vios in it.
893 *
894 * All data_vios must be returned to the pool before calling this function.
895 */
896void free_data_vio_pool(struct data_vio_pool *pool)
897{
898	struct data_vio *data_vio, *tmp;
899
900	if (pool == NULL)
901		return;
902
903	/*
904	 * Pairs with the barrier in process_release_callback(). Possibly not needed since it
905	 * caters to an enqueue vs. free race.
906	 */
907	smp_mb();
908	BUG_ON(atomic_read(&pool->processing));
909
910	spin_lock(&pool->lock);
911	VDO_ASSERT_LOG_ONLY((pool->limiter.busy == 0),
912			    "data_vio pool must not have %u busy entries when being freed",
913			    pool->limiter.busy);
914	VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->limiter.waiters) &&
915			     bio_list_empty(&pool->limiter.new_waiters)),
916			    "data_vio pool must not have threads waiting to read or write when being freed");
917	VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->discard_limiter.waiters) &&
918			     bio_list_empty(&pool->discard_limiter.new_waiters)),
919			    "data_vio pool must not have threads waiting to discard when being freed");
920	spin_unlock(&pool->lock);
921
922	list_for_each_entry_safe(data_vio, tmp, &pool->available, pool_entry) {
923		list_del_init(&data_vio->pool_entry);
924		destroy_data_vio(data_vio);
925	}
926
927	vdo_free_funnel_queue(vdo_forget(pool->queue));
928	vdo_free(pool);
929}
930
931static bool acquire_permit(struct limiter *limiter)
932{
933	if (limiter->busy >= limiter->limit)
934		return false;
935
936	WRITE_ONCE(limiter->busy, limiter->busy + 1);
937	if (limiter->max_busy < limiter->busy)
938		WRITE_ONCE(limiter->max_busy, limiter->busy);
939	return true;
940}
941
942static void wait_permit(struct limiter *limiter, struct bio *bio)
943	__releases(&limiter->pool->lock)
944{
945	DEFINE_WAIT(wait);
946
947	bio_list_add(&limiter->new_waiters, bio);
948	prepare_to_wait_exclusive(&limiter->blocked_threads, &wait,
949				  TASK_UNINTERRUPTIBLE);
950	spin_unlock(&limiter->pool->lock);
951	io_schedule();
952	finish_wait(&limiter->blocked_threads, &wait);
953}
954
955/**
956 * vdo_launch_bio() - Acquire a data_vio from the pool, assign the bio to it, and launch it.
957 *
958 * This will block if data_vios or discard permits are not available.
959 */
960void vdo_launch_bio(struct data_vio_pool *pool, struct bio *bio)
961{
962	struct data_vio *data_vio;
963
964	VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&pool->state),
965			    "data_vio_pool not quiescent on acquire");
966
967	bio->bi_private = (void *) jiffies;
968	spin_lock(&pool->lock);
969	if ((bio_op(bio) == REQ_OP_DISCARD) &&
970	    !acquire_permit(&pool->discard_limiter)) {
971		wait_permit(&pool->discard_limiter, bio);
972		return;
973	}
974
975	if (!acquire_permit(&pool->limiter)) {
976		wait_permit(&pool->limiter, bio);
977		return;
978	}
979
980	data_vio = get_available_data_vio(pool);
981	spin_unlock(&pool->lock);
982	launch_bio(pool->completion.vdo, data_vio, bio);
983}
984
985/* Implements vdo_admin_initiator_fn. */
986static void initiate_drain(struct admin_state *state)
987{
988	bool drained;
989	struct data_vio_pool *pool = container_of(state, struct data_vio_pool, state);
990
991	spin_lock(&pool->lock);
992	drained = check_for_drain_complete_locked(pool);
993	spin_unlock(&pool->lock);
994
995	if (drained)
996		vdo_finish_draining(state);
997}
998
999static void assert_on_vdo_cpu_thread(const struct vdo *vdo, const char *name)
1000{
1001	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.cpu_thread),
1002			    "%s called on cpu thread", name);
1003}
1004
1005/**
1006 * drain_data_vio_pool() - Wait asynchronously for all data_vios to be returned to the pool.
1007 * @completion: The completion to notify when the pool has drained.
1008 */
1009void drain_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion)
1010{
1011	assert_on_vdo_cpu_thread(completion->vdo, __func__);
1012	vdo_start_draining(&pool->state, VDO_ADMIN_STATE_SUSPENDING, completion,
1013			   initiate_drain);
1014}
1015
1016/**
1017 * resume_data_vio_pool() - Resume a data_vio pool.
1018 * @completion: The completion to notify when the pool has resumed.
1019 */
1020void resume_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion)
1021{
1022	assert_on_vdo_cpu_thread(completion->vdo, __func__);
1023	vdo_continue_completion(completion, vdo_resume_if_quiescent(&pool->state));
1024}
1025
1026static void dump_limiter(const char *name, struct limiter *limiter)
1027{
1028	vdo_log_info("%s: %u of %u busy (max %u), %s", name, limiter->busy,
1029		     limiter->limit, limiter->max_busy,
1030		     ((bio_list_empty(&limiter->waiters) &&
1031		       bio_list_empty(&limiter->new_waiters)) ?
1032		      "no waiters" : "has waiters"));
1033}
1034
1035/**
1036 * dump_data_vio_pool() - Dump a data_vio pool to the log.
1037 * @dump_vios: Whether to dump the details of each busy data_vio as well.
1038 */
1039void dump_data_vio_pool(struct data_vio_pool *pool, bool dump_vios)
1040{
1041	/*
1042	 * In order that syslog can empty its buffer, sleep after 35 elements for 4ms (till the
1043	 * second clock tick).  These numbers were picked based on experiments with lab machines.
1044	 */
1045	static const int ELEMENTS_PER_BATCH = 35;
1046	static const int SLEEP_FOR_SYSLOG = 4000;
1047
1048	if (pool == NULL)
1049		return;
1050
1051	spin_lock(&pool->lock);
1052	dump_limiter("data_vios", &pool->limiter);
1053	dump_limiter("discard permits", &pool->discard_limiter);
1054	if (dump_vios) {
1055		int i;
1056		int dumped = 0;
1057
1058		for (i = 0; i < pool->limiter.limit; i++) {
1059			struct data_vio *data_vio = &pool->data_vios[i];
1060
1061			if (!list_empty(&data_vio->pool_entry))
1062				continue;
1063
1064			dump_data_vio(data_vio);
1065			if (++dumped >= ELEMENTS_PER_BATCH) {
1066				spin_unlock(&pool->lock);
1067				dumped = 0;
1068				fsleep(SLEEP_FOR_SYSLOG);
1069				spin_lock(&pool->lock);
1070			}
1071		}
1072	}
1073
1074	spin_unlock(&pool->lock);
1075}
1076
1077data_vio_count_t get_data_vio_pool_active_discards(struct data_vio_pool *pool)
1078{
1079	return READ_ONCE(pool->discard_limiter.busy);
1080}
1081
1082data_vio_count_t get_data_vio_pool_discard_limit(struct data_vio_pool *pool)
1083{
1084	return READ_ONCE(pool->discard_limiter.limit);
1085}
1086
1087data_vio_count_t get_data_vio_pool_maximum_discards(struct data_vio_pool *pool)
1088{
1089	return READ_ONCE(pool->discard_limiter.max_busy);
1090}
1091
1092int set_data_vio_pool_discard_limit(struct data_vio_pool *pool, data_vio_count_t limit)
1093{
1094	if (get_data_vio_pool_request_limit(pool) < limit) {
1095		// The discard limit may not be higher than the data_vio limit.
1096		return -EINVAL;
1097	}
1098
1099	spin_lock(&pool->lock);
1100	pool->discard_limiter.limit = limit;
1101	spin_unlock(&pool->lock);
1102
1103	return VDO_SUCCESS;
1104}
1105
1106data_vio_count_t get_data_vio_pool_active_requests(struct data_vio_pool *pool)
1107{
1108	return READ_ONCE(pool->limiter.busy);
1109}
1110
1111data_vio_count_t get_data_vio_pool_request_limit(struct data_vio_pool *pool)
1112{
1113	return READ_ONCE(pool->limiter.limit);
1114}
1115
1116data_vio_count_t get_data_vio_pool_maximum_requests(struct data_vio_pool *pool)
1117{
1118	return READ_ONCE(pool->limiter.max_busy);
1119}
1120
1121static void update_data_vio_error_stats(struct data_vio *data_vio)
1122{
1123	u8 index = 0;
1124	static const char * const operations[] = {
1125		[0] = "empty",
1126		[1] = "read",
1127		[2] = "write",
1128		[3] = "read-modify-write",
1129		[5] = "read+fua",
1130		[6] = "write+fua",
1131		[7] = "read-modify-write+fua",
1132	};
1133
1134	if (data_vio->read)
1135		index = 1;
1136
1137	if (data_vio->write)
1138		index += 2;
1139
1140	if (data_vio->fua)
1141		index += 4;
1142
1143	update_vio_error_stats(&data_vio->vio,
1144			       "Completing %s vio for LBN %llu with error after %s",
1145			       operations[index],
1146			       (unsigned long long) data_vio->logical.lbn,
1147			       get_data_vio_operation_name(data_vio));
1148}
1149
1150static void perform_cleanup_stage(struct data_vio *data_vio,
1151				  enum data_vio_cleanup_stage stage);
1152
1153/**
1154 * release_allocated_lock() - Release the PBN lock and/or the reference on the allocated block at
1155 *			      the end of processing a data_vio.
1156 */
1157static void release_allocated_lock(struct vdo_completion *completion)
1158{
1159	struct data_vio *data_vio = as_data_vio(completion);
1160
1161	assert_data_vio_in_allocated_zone(data_vio);
1162	release_data_vio_allocation_lock(data_vio, false);
1163	perform_cleanup_stage(data_vio, VIO_RELEASE_RECOVERY_LOCKS);
1164}
1165
1166/** release_lock() - Release an uncontended LBN lock. */
1167static void release_lock(struct data_vio *data_vio, struct lbn_lock *lock)
1168{
1169	struct int_map *lock_map = lock->zone->lbn_operations;
1170	struct data_vio *lock_holder;
1171
1172	if (!lock->locked) {
1173		/*  The lock is not locked, so it had better not be registered in the lock map. */
1174		struct data_vio *lock_holder = vdo_int_map_get(lock_map, lock->lbn);
1175
1176		VDO_ASSERT_LOG_ONLY((data_vio != lock_holder),
1177				    "no logical block lock held for block %llu",
1178				    (unsigned long long) lock->lbn);
1179		return;
1180	}
1181
1182	/* Release the lock by removing the lock from the map. */
1183	lock_holder = vdo_int_map_remove(lock_map, lock->lbn);
1184	VDO_ASSERT_LOG_ONLY((data_vio == lock_holder),
1185			    "logical block lock mismatch for block %llu",
1186			    (unsigned long long) lock->lbn);
1187	lock->locked = false;
1188}
1189
1190/** transfer_lock() - Transfer a contended LBN lock to the eldest waiter. */
1191static void transfer_lock(struct data_vio *data_vio, struct lbn_lock *lock)
1192{
1193	struct data_vio *lock_holder, *next_lock_holder;
1194	int result;
1195
1196	VDO_ASSERT_LOG_ONLY(lock->locked, "lbn_lock with waiters is not locked");
1197
1198	/* Another data_vio is waiting for the lock, transfer it in a single lock map operation. */
1199	next_lock_holder =
1200		vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters));
1201
1202	/* Transfer the remaining lock waiters to the next lock holder. */
1203	vdo_waitq_transfer_all_waiters(&lock->waiters,
1204				       &next_lock_holder->logical.waiters);
1205
1206	result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn,
1207				 next_lock_holder, true, (void **) &lock_holder);
1208	if (result != VDO_SUCCESS) {
1209		continue_data_vio_with_error(next_lock_holder, result);
1210		return;
1211	}
1212
1213	VDO_ASSERT_LOG_ONLY((lock_holder == data_vio),
1214			    "logical block lock mismatch for block %llu",
1215			    (unsigned long long) lock->lbn);
1216	lock->locked = false;
1217
1218	/*
1219	 * If there are still waiters, other data_vios must be trying to get the lock we just
1220	 * transferred. We must ensure that the new lock holder doesn't block in the packer.
1221	 */
1222	if (vdo_waitq_has_waiters(&next_lock_holder->logical.waiters))
1223		cancel_data_vio_compression(next_lock_holder);
1224
1225	/*
1226	 * Avoid stack overflow on lock transfer.
1227	 * FIXME: this is only an issue in the 1 thread config.
1228	 */
1229	next_lock_holder->vio.completion.requeue = true;
1230	launch_locked_request(next_lock_holder);
1231}
1232
1233/**
1234 * release_logical_lock() - Release the logical block lock and flush generation lock at the end of
1235 *			    processing a data_vio.
1236 */
1237static void release_logical_lock(struct vdo_completion *completion)
1238{
1239	struct data_vio *data_vio = as_data_vio(completion);
1240	struct lbn_lock *lock = &data_vio->logical;
1241
1242	assert_data_vio_in_logical_zone(data_vio);
1243
1244	if (vdo_waitq_has_waiters(&lock->waiters))
1245		transfer_lock(data_vio, lock);
1246	else
1247		release_lock(data_vio, lock);
1248
1249	vdo_release_flush_generation_lock(data_vio);
1250	perform_cleanup_stage(data_vio, VIO_CLEANUP_DONE);
1251}
1252
1253/** clean_hash_lock() - Release the hash lock at the end of processing a data_vio. */
1254static void clean_hash_lock(struct vdo_completion *completion)
1255{
1256	struct data_vio *data_vio = as_data_vio(completion);
1257
1258	assert_data_vio_in_hash_zone(data_vio);
1259	if (completion->result != VDO_SUCCESS) {
1260		vdo_clean_failed_hash_lock(data_vio);
1261		return;
1262	}
1263
1264	vdo_release_hash_lock(data_vio);
1265	perform_cleanup_stage(data_vio, VIO_RELEASE_LOGICAL);
1266}
1267
1268/**
1269 * finish_cleanup() - Make some assertions about a data_vio which has finished cleaning up.
1270 *
1271 * If it is part of a multi-block discard, starts on the next block, otherwise, returns it to the
1272 * pool.
1273 */
1274static void finish_cleanup(struct data_vio *data_vio)
1275{
1276	struct vdo_completion *completion = &data_vio->vio.completion;
1277
1278	VDO_ASSERT_LOG_ONLY(data_vio->allocation.lock == NULL,
1279			    "complete data_vio has no allocation lock");
1280	VDO_ASSERT_LOG_ONLY(data_vio->hash_lock == NULL,
1281			    "complete data_vio has no hash lock");
1282	if ((data_vio->remaining_discard <= VDO_BLOCK_SIZE) ||
1283	    (completion->result != VDO_SUCCESS)) {
1284		struct data_vio_pool *pool = completion->vdo->data_vio_pool;
1285
1286		vdo_funnel_queue_put(pool->queue, &completion->work_queue_entry_link);
1287		schedule_releases(pool);
1288		return;
1289	}
1290
1291	data_vio->remaining_discard -= min_t(u32, data_vio->remaining_discard,
1292					     VDO_BLOCK_SIZE - data_vio->offset);
1293	data_vio->is_partial = (data_vio->remaining_discard < VDO_BLOCK_SIZE);
1294	data_vio->read = data_vio->is_partial;
1295	data_vio->offset = 0;
1296	completion->requeue = true;
1297	launch_data_vio(data_vio, data_vio->logical.lbn + 1);
1298}
1299
1300/** perform_cleanup_stage() - Perform the next step in the process of cleaning up a data_vio. */
1301static void perform_cleanup_stage(struct data_vio *data_vio,
1302				  enum data_vio_cleanup_stage stage)
1303{
1304	struct vdo *vdo = vdo_from_data_vio(data_vio);
1305
1306	switch (stage) {
1307	case VIO_RELEASE_HASH_LOCK:
1308		if (data_vio->hash_lock != NULL) {
1309			launch_data_vio_hash_zone_callback(data_vio, clean_hash_lock);
1310			return;
1311		}
1312		fallthrough;
1313
1314	case VIO_RELEASE_ALLOCATED:
1315		if (data_vio_has_allocation(data_vio)) {
1316			launch_data_vio_allocated_zone_callback(data_vio,
1317								release_allocated_lock);
1318			return;
1319		}
1320		fallthrough;
1321
1322	case VIO_RELEASE_RECOVERY_LOCKS:
1323		if ((data_vio->recovery_sequence_number > 0) &&
1324		    (READ_ONCE(vdo->read_only_notifier.read_only_error) == VDO_SUCCESS) &&
1325		    (data_vio->vio.completion.result != VDO_READ_ONLY))
1326			vdo_log_warning("VDO not read-only when cleaning data_vio with RJ lock");
1327		fallthrough;
1328
1329	case VIO_RELEASE_LOGICAL:
1330		launch_data_vio_logical_callback(data_vio, release_logical_lock);
1331		return;
1332
1333	default:
1334		finish_cleanup(data_vio);
1335	}
1336}
1337
1338void complete_data_vio(struct vdo_completion *completion)
1339{
1340	struct data_vio *data_vio = as_data_vio(completion);
1341
1342	completion->error_handler = NULL;
1343	data_vio->last_async_operation = VIO_ASYNC_OP_CLEANUP;
1344	perform_cleanup_stage(data_vio,
1345			      (data_vio->write ? VIO_CLEANUP_START : VIO_RELEASE_LOGICAL));
1346}
1347
1348static void enter_read_only_mode(struct vdo_completion *completion)
1349{
1350	if (vdo_is_read_only(completion->vdo))
1351		return;
1352
1353	if (completion->result != VDO_READ_ONLY) {
1354		struct data_vio *data_vio = as_data_vio(completion);
1355
1356		vdo_log_error_strerror(completion->result,
1357				       "Preparing to enter read-only mode: data_vio for LBN %llu (becoming mapped to %llu, previously mapped to %llu, allocated %llu) is completing with a fatal error after operation %s",
1358				       (unsigned long long) data_vio->logical.lbn,
1359				       (unsigned long long) data_vio->new_mapped.pbn,
1360				       (unsigned long long) data_vio->mapped.pbn,
1361				       (unsigned long long) data_vio->allocation.pbn,
1362				       get_data_vio_operation_name(data_vio));
1363	}
1364
1365	vdo_enter_read_only_mode(completion->vdo, completion->result);
1366}
1367
1368void handle_data_vio_error(struct vdo_completion *completion)
1369{
1370	struct data_vio *data_vio = as_data_vio(completion);
1371
1372	if ((completion->result == VDO_READ_ONLY) || (data_vio->user_bio == NULL))
1373		enter_read_only_mode(completion);
1374
1375	update_data_vio_error_stats(data_vio);
1376	complete_data_vio(completion);
1377}
1378
1379/**
1380 * get_data_vio_operation_name() - Get the name of the last asynchronous operation performed on a
1381 *				   data_vio.
1382 */
1383const char *get_data_vio_operation_name(struct data_vio *data_vio)
1384{
1385	BUILD_BUG_ON((MAX_VIO_ASYNC_OPERATION_NUMBER - MIN_VIO_ASYNC_OPERATION_NUMBER) !=
1386		     ARRAY_SIZE(ASYNC_OPERATION_NAMES));
1387
1388	return ((data_vio->last_async_operation < MAX_VIO_ASYNC_OPERATION_NUMBER) ?
1389		ASYNC_OPERATION_NAMES[data_vio->last_async_operation] :
1390		"unknown async operation");
1391}
1392
1393/**
1394 * data_vio_allocate_data_block() - Allocate a data block.
1395 *
1396 * @write_lock_type: The type of write lock to obtain on the block.
1397 * @callback: The callback which will attempt an allocation in the current zone and continue if it
1398 *	      succeeds.
1399 * @error_handler: The handler for errors while allocating.
1400 */
1401void data_vio_allocate_data_block(struct data_vio *data_vio,
1402				  enum pbn_lock_type write_lock_type,
1403				  vdo_action_fn callback, vdo_action_fn error_handler)
1404{
1405	struct allocation *allocation = &data_vio->allocation;
1406
1407	VDO_ASSERT_LOG_ONLY((allocation->pbn == VDO_ZERO_BLOCK),
1408			    "data_vio does not have an allocation");
1409	allocation->write_lock_type = write_lock_type;
1410	allocation->zone = vdo_get_next_allocation_zone(data_vio->logical.zone);
1411	allocation->first_allocation_zone = allocation->zone->zone_number;
1412
1413	data_vio->vio.completion.error_handler = error_handler;
1414	launch_data_vio_allocated_zone_callback(data_vio, callback);
1415}
1416
1417/**
1418 * release_data_vio_allocation_lock() - Release the PBN lock on a data_vio's allocated block.
1419 * @reset: If true, the allocation will be reset (i.e. any allocated pbn will be forgotten).
1420 *
1421 * If the reference to the locked block is still provisional, it will be released as well.
1422 */
1423void release_data_vio_allocation_lock(struct data_vio *data_vio, bool reset)
1424{
1425	struct allocation *allocation = &data_vio->allocation;
1426	physical_block_number_t locked_pbn = allocation->pbn;
1427
1428	assert_data_vio_in_allocated_zone(data_vio);
1429
1430	if (reset || vdo_pbn_lock_has_provisional_reference(allocation->lock))
1431		allocation->pbn = VDO_ZERO_BLOCK;
1432
1433	vdo_release_physical_zone_pbn_lock(allocation->zone, locked_pbn,
1434					   vdo_forget(allocation->lock));
1435}
1436
1437/**
1438 * uncompress_data_vio() - Uncompress the data a data_vio has just read.
1439 * @mapping_state: The mapping state indicating which fragment to decompress.
1440 * @buffer: The buffer to receive the uncompressed data.
1441 */
1442int uncompress_data_vio(struct data_vio *data_vio,
1443			enum block_mapping_state mapping_state, char *buffer)
1444{
1445	int size;
1446	u16 fragment_offset, fragment_size;
1447	struct compressed_block *block = data_vio->compression.block;
1448	int result = vdo_get_compressed_block_fragment(mapping_state, block,
1449						       &fragment_offset, &fragment_size);
1450
1451	if (result != VDO_SUCCESS) {
1452		vdo_log_debug("%s: compressed fragment error %d", __func__, result);
1453		return result;
1454	}
1455
1456	size = LZ4_decompress_safe((block->data + fragment_offset), buffer,
1457				   fragment_size, VDO_BLOCK_SIZE);
1458	if (size != VDO_BLOCK_SIZE) {
1459		vdo_log_debug("%s: lz4 error", __func__);
1460		return VDO_INVALID_FRAGMENT;
1461	}
1462
1463	return VDO_SUCCESS;
1464}
1465
1466/**
1467 * modify_for_partial_write() - Do the modify-write part of a read-modify-write cycle.
1468 * @completion: The data_vio which has just finished its read.
1469 *
1470 * This callback is registered in read_block().
1471 */
1472static void modify_for_partial_write(struct vdo_completion *completion)
1473{
1474	struct data_vio *data_vio = as_data_vio(completion);
1475	char *data = data_vio->vio.data;
1476	struct bio *bio = data_vio->user_bio;
1477
1478	assert_data_vio_on_cpu_thread(data_vio);
1479
1480	if (bio_op(bio) == REQ_OP_DISCARD) {
1481		memset(data + data_vio->offset, '\0', min_t(u32,
1482							    data_vio->remaining_discard,
1483							    VDO_BLOCK_SIZE - data_vio->offset));
1484	} else {
1485		copy_from_bio(bio, data + data_vio->offset);
1486	}
1487
1488	data_vio->is_zero = is_zero_block(data);
1489	data_vio->read = false;
1490	launch_data_vio_logical_callback(data_vio,
1491					 continue_data_vio_with_block_map_slot);
1492}
1493
1494static void complete_read(struct vdo_completion *completion)
1495{
1496	struct data_vio *data_vio = as_data_vio(completion);
1497	char *data = data_vio->vio.data;
1498	bool compressed = vdo_is_state_compressed(data_vio->mapped.state);
1499
1500	assert_data_vio_on_cpu_thread(data_vio);
1501
1502	if (compressed) {
1503		int result = uncompress_data_vio(data_vio, data_vio->mapped.state, data);
1504
1505		if (result != VDO_SUCCESS) {
1506			continue_data_vio_with_error(data_vio, result);
1507			return;
1508		}
1509	}
1510
1511	if (data_vio->write) {
1512		modify_for_partial_write(completion);
1513		return;
1514	}
1515
1516	if (compressed || data_vio->is_partial)
1517		copy_to_bio(data_vio->user_bio, data + data_vio->offset);
1518
1519	acknowledge_data_vio(data_vio);
1520	complete_data_vio(completion);
1521}
1522
1523static void read_endio(struct bio *bio)
1524{
1525	struct data_vio *data_vio = vio_as_data_vio(bio->bi_private);
1526	int result = blk_status_to_errno(bio->bi_status);
1527
1528	vdo_count_completed_bios(bio);
1529	if (result != VDO_SUCCESS) {
1530		continue_data_vio_with_error(data_vio, result);
1531		return;
1532	}
1533
1534	launch_data_vio_cpu_callback(data_vio, complete_read,
1535				     CPU_Q_COMPLETE_READ_PRIORITY);
1536}
1537
1538static void complete_zero_read(struct vdo_completion *completion)
1539{
1540	struct data_vio *data_vio = as_data_vio(completion);
1541
1542	assert_data_vio_on_cpu_thread(data_vio);
1543
1544	if (data_vio->is_partial) {
1545		memset(data_vio->vio.data, 0, VDO_BLOCK_SIZE);
1546		if (data_vio->write) {
1547			modify_for_partial_write(completion);
1548			return;
1549		}
1550	} else {
1551		zero_fill_bio(data_vio->user_bio);
1552	}
1553
1554	complete_read(completion);
1555}
1556
1557/**
1558 * read_block() - Read a block asynchronously.
1559 *
1560 * This is the callback registered in read_block_mapping().
1561 */
1562static void read_block(struct vdo_completion *completion)
1563{
1564	struct data_vio *data_vio = as_data_vio(completion);
1565	struct vio *vio = as_vio(completion);
1566	int result = VDO_SUCCESS;
1567
1568	if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) {
1569		launch_data_vio_cpu_callback(data_vio, complete_zero_read,
1570					     CPU_Q_COMPLETE_VIO_PRIORITY);
1571		return;
1572	}
1573
1574	data_vio->last_async_operation = VIO_ASYNC_OP_READ_DATA_VIO;
1575	if (vdo_is_state_compressed(data_vio->mapped.state)) {
1576		result = vio_reset_bio(vio, (char *) data_vio->compression.block,
1577				       read_endio, REQ_OP_READ, data_vio->mapped.pbn);
1578	} else {
1579		blk_opf_t opf = ((data_vio->user_bio->bi_opf & PASSTHROUGH_FLAGS) | REQ_OP_READ);
1580
1581		if (data_vio->is_partial) {
1582			result = vio_reset_bio(vio, vio->data, read_endio, opf,
1583					       data_vio->mapped.pbn);
1584		} else {
1585			/* A full 4k read. Use the incoming bio to avoid having to copy the data */
1586			bio_reset(vio->bio, vio->bio->bi_bdev, opf);
1587			bio_init_clone(data_vio->user_bio->bi_bdev, vio->bio,
1588				       data_vio->user_bio, GFP_KERNEL);
1589
1590			/* Copy over the original bio iovec and opflags. */
1591			vdo_set_bio_properties(vio->bio, vio, read_endio, opf,
1592					       data_vio->mapped.pbn);
1593		}
1594	}
1595
1596	if (result != VDO_SUCCESS) {
1597		continue_data_vio_with_error(data_vio, result);
1598		return;
1599	}
1600
1601	vdo_submit_data_vio(data_vio);
1602}
1603
1604static inline struct data_vio *
1605reference_count_update_completion_as_data_vio(struct vdo_completion *completion)
1606{
1607	if (completion->type == VIO_COMPLETION)
1608		return as_data_vio(completion);
1609
1610	return container_of(completion, struct data_vio, decrement_completion);
1611}
1612
1613/**
1614 * update_block_map() - Rendezvous of the data_vio and decrement completions after each has
1615 *                      made its reference updates. Handle any error from either, or proceed
1616 *                      to updating the block map.
1617 * @completion: The completion of the write in progress.
1618 */
1619static void update_block_map(struct vdo_completion *completion)
1620{
1621	struct data_vio *data_vio = reference_count_update_completion_as_data_vio(completion);
1622
1623	assert_data_vio_in_logical_zone(data_vio);
1624
1625	if (!data_vio->first_reference_operation_complete) {
1626		/* Rendezvous, we're first */
1627		data_vio->first_reference_operation_complete = true;
1628		return;
1629	}
1630
1631	completion = &data_vio->vio.completion;
1632	vdo_set_completion_result(completion, data_vio->decrement_completion.result);
1633	if (completion->result != VDO_SUCCESS) {
1634		handle_data_vio_error(completion);
1635		return;
1636	}
1637
1638	completion->error_handler = handle_data_vio_error;
1639	if (data_vio->hash_lock != NULL)
1640		set_data_vio_hash_zone_callback(data_vio, vdo_continue_hash_lock);
1641	else
1642		completion->callback = complete_data_vio;
1643
1644	data_vio->last_async_operation = VIO_ASYNC_OP_PUT_MAPPED_BLOCK;
1645	vdo_put_mapped_block(data_vio);
1646}
1647
1648static void decrement_reference_count(struct vdo_completion *completion)
1649{
1650	struct data_vio *data_vio = container_of(completion, struct data_vio,
1651						 decrement_completion);
1652
1653	assert_data_vio_in_mapped_zone(data_vio);
1654
1655	vdo_set_completion_callback(completion, update_block_map,
1656				    data_vio->logical.zone->thread_id);
1657	completion->error_handler = update_block_map;
1658	vdo_modify_reference_count(completion, &data_vio->decrement_updater);
1659}
1660
1661static void increment_reference_count(struct vdo_completion *completion)
1662{
1663	struct data_vio *data_vio = as_data_vio(completion);
1664
1665	assert_data_vio_in_new_mapped_zone(data_vio);
1666
1667	if (data_vio->downgrade_allocation_lock) {
1668		/*
1669		 * Now that the data has been written, it's safe to deduplicate against the
1670		 * block. Downgrade the allocation lock to a read lock so it can be used later by
1671		 * the hash lock. This is done here since it needs to happen sometime before we
1672		 * return to the hash zone, and we are currently on the correct thread. For
1673		 * compressed blocks, the downgrade will have already been done.
1674		 */
1675		vdo_downgrade_pbn_write_lock(data_vio->allocation.lock, false);
1676	}
1677
1678	set_data_vio_logical_callback(data_vio, update_block_map);
1679	completion->error_handler = update_block_map;
1680	vdo_modify_reference_count(completion, &data_vio->increment_updater);
1681}
1682
1683/** journal_remapping() - Add a recovery journal entry for a data remapping. */
1684static void journal_remapping(struct vdo_completion *completion)
1685{
1686	struct data_vio *data_vio = as_data_vio(completion);
1687
1688	assert_data_vio_in_journal_zone(data_vio);
1689
1690	data_vio->decrement_updater.operation = VDO_JOURNAL_DATA_REMAPPING;
1691	data_vio->decrement_updater.zpbn = data_vio->mapped;
1692	if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) {
1693		data_vio->first_reference_operation_complete = true;
1694		if (data_vio->mapped.pbn == VDO_ZERO_BLOCK)
1695			set_data_vio_logical_callback(data_vio, update_block_map);
1696	} else {
1697		set_data_vio_new_mapped_zone_callback(data_vio,
1698						      increment_reference_count);
1699	}
1700
1701	if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) {
1702		data_vio->first_reference_operation_complete = true;
1703	} else {
1704		vdo_set_completion_callback(&data_vio->decrement_completion,
1705					    decrement_reference_count,
1706					    data_vio->mapped.zone->thread_id);
1707	}
1708
1709	data_vio->last_async_operation = VIO_ASYNC_OP_JOURNAL_REMAPPING;
1710	vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio);
1711}
1712
1713/**
1714 * read_old_block_mapping() - Get the previous PBN/LBN mapping of an in-progress write.
1715 *
1716 * Gets the previous PBN mapped to this LBN from the block map, so as to make an appropriate
1717 * journal entry referencing the removal of this LBN->PBN mapping.
1718 */
1719static void read_old_block_mapping(struct vdo_completion *completion)
1720{
1721	struct data_vio *data_vio = as_data_vio(completion);
1722
1723	assert_data_vio_in_logical_zone(data_vio);
1724
1725	data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_WRITE;
1726	set_data_vio_journal_callback(data_vio, journal_remapping);
1727	vdo_get_mapped_block(data_vio);
1728}
1729
1730void update_metadata_for_data_vio_write(struct data_vio *data_vio, struct pbn_lock *lock)
1731{
1732	data_vio->increment_updater = (struct reference_updater) {
1733		.operation = VDO_JOURNAL_DATA_REMAPPING,
1734		.increment = true,
1735		.zpbn = data_vio->new_mapped,
1736		.lock = lock,
1737	};
1738
1739	launch_data_vio_logical_callback(data_vio, read_old_block_mapping);
1740}
1741
1742/**
1743 * pack_compressed_data() - Attempt to pack the compressed data_vio into a block.
1744 *
1745 * This is the callback registered in launch_compress_data_vio().
1746 */
1747static void pack_compressed_data(struct vdo_completion *completion)
1748{
1749	struct data_vio *data_vio = as_data_vio(completion);
1750
1751	assert_data_vio_in_packer_zone(data_vio);
1752
1753	if (!vdo_get_compressing(vdo_from_data_vio(data_vio)) ||
1754	    get_data_vio_compression_status(data_vio).may_not_compress) {
1755		write_data_vio(data_vio);
1756		return;
1757	}
1758
1759	data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_PACKING;
1760	vdo_attempt_packing(data_vio);
1761}
1762
1763/**
1764 * compress_data_vio() - Do the actual work of compressing the data on a CPU queue.
1765 *
1766 * This callback is registered in launch_compress_data_vio().
1767 */
1768static void compress_data_vio(struct vdo_completion *completion)
1769{
1770	struct data_vio *data_vio = as_data_vio(completion);
1771	int size;
1772
1773	assert_data_vio_on_cpu_thread(data_vio);
1774
1775	/*
1776	 * By putting the compressed data at the start of the compressed block data field, we won't
1777	 * need to copy it if this data_vio becomes a compressed write agent.
1778	 */
1779	size = LZ4_compress_default(data_vio->vio.data,
1780				    data_vio->compression.block->data, VDO_BLOCK_SIZE,
1781				    VDO_MAX_COMPRESSED_FRAGMENT_SIZE,
1782				    (char *) vdo_get_work_queue_private_data());
1783	if ((size > 0) && (size < VDO_COMPRESSED_BLOCK_DATA_SIZE)) {
1784		data_vio->compression.size = size;
1785		launch_data_vio_packer_callback(data_vio, pack_compressed_data);
1786		return;
1787	}
1788
1789	write_data_vio(data_vio);
1790}
1791
1792/**
1793 * launch_compress_data_vio() - Continue a write by attempting to compress the data.
1794 *
1795 * This is a re-entry point to vio_write used by hash locks.
1796 */
1797void launch_compress_data_vio(struct data_vio *data_vio)
1798{
1799	VDO_ASSERT_LOG_ONLY(!data_vio->is_duplicate, "compressing a non-duplicate block");
1800	VDO_ASSERT_LOG_ONLY(data_vio->hash_lock != NULL,
1801			    "data_vio to compress has a hash_lock");
1802	VDO_ASSERT_LOG_ONLY(data_vio_has_allocation(data_vio),
1803			    "data_vio to compress has an allocation");
1804
1805	/*
1806	 * There are 4 reasons why a data_vio which has reached this point will not be eligible for
1807	 * compression:
1808	 *
1809	 * 1) Since data_vios can block indefinitely in the packer, it would be bad to do so if the
1810	 * write request also requests FUA.
1811	 *
1812	 * 2) A data_vio should not be compressed when compression is disabled for the vdo.
1813	 *
1814	 * 3) A data_vio could be doing a partial write on behalf of a larger discard which has not
1815	 * yet been acknowledged and hence blocking in the packer would be bad.
1816	 *
1817	 * 4) Some other data_vio may be waiting on this data_vio in which case blocking in the
1818	 * packer would also be bad.
1819	 */
1820	if (data_vio->fua ||
1821	    !vdo_get_compressing(vdo_from_data_vio(data_vio)) ||
1822	    ((data_vio->user_bio != NULL) && (bio_op(data_vio->user_bio) == REQ_OP_DISCARD)) ||
1823	    (advance_data_vio_compression_stage(data_vio).stage != DATA_VIO_COMPRESSING)) {
1824		write_data_vio(data_vio);
1825		return;
1826	}
1827
1828	data_vio->last_async_operation = VIO_ASYNC_OP_COMPRESS_DATA_VIO;
1829	launch_data_vio_cpu_callback(data_vio, compress_data_vio,
1830				     CPU_Q_COMPRESS_BLOCK_PRIORITY);
1831}
1832
1833/**
1834 * hash_data_vio() - Hash the data in a data_vio and set the hash zone (which also flags the record
1835 *		     name as set).
1836
1837 * This callback is registered in prepare_for_dedupe().
1838 */
1839static void hash_data_vio(struct vdo_completion *completion)
1840{
1841	struct data_vio *data_vio = as_data_vio(completion);
1842
1843	assert_data_vio_on_cpu_thread(data_vio);
1844	VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "zero blocks should not be hashed");
1845
1846	murmurhash3_128(data_vio->vio.data, VDO_BLOCK_SIZE, 0x62ea60be,
1847			&data_vio->record_name);
1848
1849	data_vio->hash_zone = vdo_select_hash_zone(vdo_from_data_vio(data_vio)->hash_zones,
1850						   &data_vio->record_name);
1851	data_vio->last_async_operation = VIO_ASYNC_OP_ACQUIRE_VDO_HASH_LOCK;
1852	launch_data_vio_hash_zone_callback(data_vio, vdo_acquire_hash_lock);
1853}
1854
1855/** prepare_for_dedupe() - Prepare for the dedupe path after attempting to get an allocation. */
1856static void prepare_for_dedupe(struct data_vio *data_vio)
1857{
1858	/* We don't care what thread we are on. */
1859	VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "must not prepare to dedupe zero blocks");
1860
1861	/*
1862	 * Before we can dedupe, we need to know the record name, so the first
1863	 * step is to hash the block data.
1864	 */
1865	data_vio->last_async_operation = VIO_ASYNC_OP_HASH_DATA_VIO;
1866	launch_data_vio_cpu_callback(data_vio, hash_data_vio, CPU_Q_HASH_BLOCK_PRIORITY);
1867}
1868
1869/**
1870 * write_bio_finished() - This is the bio_end_io function registered in write_block() to be called
1871 *			  when a data_vio's write to the underlying storage has completed.
1872 */
1873static void write_bio_finished(struct bio *bio)
1874{
1875	struct data_vio *data_vio = vio_as_data_vio((struct vio *) bio->bi_private);
1876
1877	vdo_count_completed_bios(bio);
1878	vdo_set_completion_result(&data_vio->vio.completion,
1879				  blk_status_to_errno(bio->bi_status));
1880	data_vio->downgrade_allocation_lock = true;
1881	update_metadata_for_data_vio_write(data_vio, data_vio->allocation.lock);
1882}
1883
1884/** write_data_vio() - Write a data block to storage without compression. */
1885void write_data_vio(struct data_vio *data_vio)
1886{
1887	struct data_vio_compression_status status, new_status;
1888	int result;
1889
1890	if (!data_vio_has_allocation(data_vio)) {
1891		/*
1892		 * There was no space to write this block and we failed to deduplicate or compress
1893		 * it.
1894		 */
1895		continue_data_vio_with_error(data_vio, VDO_NO_SPACE);
1896		return;
1897	}
1898
1899	new_status = (struct data_vio_compression_status) {
1900		.stage = DATA_VIO_POST_PACKER,
1901		.may_not_compress = true,
1902	};
1903
1904	do {
1905		status = get_data_vio_compression_status(data_vio);
1906	} while ((status.stage != DATA_VIO_POST_PACKER) &&
1907		 !set_data_vio_compression_status(data_vio, status, new_status));
1908
1909	/* Write the data from the data block buffer. */
1910	result = vio_reset_bio(&data_vio->vio, data_vio->vio.data,
1911			       write_bio_finished, REQ_OP_WRITE,
1912			       data_vio->allocation.pbn);
1913	if (result != VDO_SUCCESS) {
1914		continue_data_vio_with_error(data_vio, result);
1915		return;
1916	}
1917
1918	data_vio->last_async_operation = VIO_ASYNC_OP_WRITE_DATA_VIO;
1919	vdo_submit_data_vio(data_vio);
1920}
1921
1922/**
1923 * acknowledge_write_callback() - Acknowledge a write to the requestor.
1924 *
1925 * This callback is registered in allocate_block() and continue_write_with_block_map_slot().
1926 */
1927static void acknowledge_write_callback(struct vdo_completion *completion)
1928{
1929	struct data_vio *data_vio = as_data_vio(completion);
1930	struct vdo *vdo = completion->vdo;
1931
1932	VDO_ASSERT_LOG_ONLY((!vdo_uses_bio_ack_queue(vdo) ||
1933			     (vdo_get_callback_thread_id() == vdo->thread_config.bio_ack_thread)),
1934			    "%s() called on bio ack queue", __func__);
1935	VDO_ASSERT_LOG_ONLY(data_vio_has_flush_generation_lock(data_vio),
1936			    "write VIO to be acknowledged has a flush generation lock");
1937	acknowledge_data_vio(data_vio);
1938	if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) {
1939		/* This is a zero write or discard */
1940		update_metadata_for_data_vio_write(data_vio, NULL);
1941		return;
1942	}
1943
1944	prepare_for_dedupe(data_vio);
1945}
1946
1947/**
1948 * allocate_block() - Attempt to allocate a block in the current allocation zone.
1949 *
1950 * This callback is registered in continue_write_with_block_map_slot().
1951 */
1952static void allocate_block(struct vdo_completion *completion)
1953{
1954	struct data_vio *data_vio = as_data_vio(completion);
1955
1956	assert_data_vio_in_allocated_zone(data_vio);
1957
1958	if (!vdo_allocate_block_in_zone(data_vio))
1959		return;
1960
1961	completion->error_handler = handle_data_vio_error;
1962	WRITE_ONCE(data_vio->allocation_succeeded, true);
1963	data_vio->new_mapped = (struct zoned_pbn) {
1964		.zone = data_vio->allocation.zone,
1965		.pbn = data_vio->allocation.pbn,
1966		.state = VDO_MAPPING_STATE_UNCOMPRESSED,
1967	};
1968
1969	if (data_vio->fua) {
1970		prepare_for_dedupe(data_vio);
1971		return;
1972	}
1973
1974	data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE;
1975	launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback);
1976}
1977
1978/**
1979 * handle_allocation_error() - Handle an error attempting to allocate a block.
1980 *
1981 * This error handler is registered in continue_write_with_block_map_slot().
1982 */
1983static void handle_allocation_error(struct vdo_completion *completion)
1984{
1985	struct data_vio *data_vio = as_data_vio(completion);
1986
1987	if (completion->result == VDO_NO_SPACE) {
1988		/* We failed to get an allocation, but we can try to dedupe. */
1989		vdo_reset_completion(completion);
1990		completion->error_handler = handle_data_vio_error;
1991		prepare_for_dedupe(data_vio);
1992		return;
1993	}
1994
1995	/* We got a "real" error, not just a failure to allocate, so fail the request. */
1996	handle_data_vio_error(completion);
1997}
1998
1999static int assert_is_discard(struct data_vio *data_vio)
2000{
2001	int result = VDO_ASSERT(data_vio->is_discard,
2002				"data_vio with no block map page is a discard");
2003
2004	return ((result == VDO_SUCCESS) ? result : VDO_READ_ONLY);
2005}
2006
2007/**
2008 * continue_data_vio_with_block_map_slot() - Read the data_vio's mapping from the block map.
2009 *
2010 * This callback is registered in launch_read_data_vio().
2011 */
2012void continue_data_vio_with_block_map_slot(struct vdo_completion *completion)
2013{
2014	struct data_vio *data_vio = as_data_vio(completion);
2015
2016	assert_data_vio_in_logical_zone(data_vio);
2017	if (data_vio->read) {
2018		set_data_vio_logical_callback(data_vio, read_block);
2019		data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_READ;
2020		vdo_get_mapped_block(data_vio);
2021		return;
2022	}
2023
2024	vdo_acquire_flush_generation_lock(data_vio);
2025
2026	if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) {
2027		/*
2028		 * This is a discard for a block on a block map page which has not been allocated, so
2029		 * there's nothing more we need to do.
2030		 */
2031		completion->callback = complete_data_vio;
2032		continue_data_vio_with_error(data_vio, assert_is_discard(data_vio));
2033		return;
2034	}
2035
2036	/*
2037	 * We need an allocation if this is neither a full-block discard nor a
2038	 * full-block zero write.
2039	 */
2040	if (!data_vio->is_zero && (!data_vio->is_discard || data_vio->is_partial)) {
2041		data_vio_allocate_data_block(data_vio, VIO_WRITE_LOCK, allocate_block,
2042					     handle_allocation_error);
2043		return;
2044	}
2045
2046
2047	/*
2048	 * We don't need to write any data, so skip allocation and just update the block map and
2049	 * reference counts (via the journal).
2050	 */
2051	data_vio->new_mapped.pbn = VDO_ZERO_BLOCK;
2052	if (data_vio->is_zero)
2053		data_vio->new_mapped.state = VDO_MAPPING_STATE_UNCOMPRESSED;
2054
2055	if (data_vio->remaining_discard > VDO_BLOCK_SIZE) {
2056		/* This is not the final block of a discard so we can't acknowledge it yet. */
2057		update_metadata_for_data_vio_write(data_vio, NULL);
2058		return;
2059	}
2060
2061	data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE;
2062	launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback);
2063}
2064