1193640Sariff// SPDX-License-Identifier: GPL-2.0-only 2193640Sariff/* 3193640Sariff * Copyright 2023 Red Hat 4193640Sariff */ 5193640Sariff 6193640Sariff#include "data-vio.h" 7193640Sariff 8193640Sariff#include <linux/atomic.h> 9193640Sariff#include <linux/bio.h> 10193640Sariff#include <linux/blkdev.h> 11193640Sariff#include <linux/delay.h> 12193640Sariff#include <linux/device-mapper.h> 13193640Sariff#include <linux/jiffies.h> 14193640Sariff#include <linux/kernel.h> 15193640Sariff#include <linux/list.h> 16193640Sariff#include <linux/lz4.h> 17193640Sariff#include <linux/minmax.h> 18193640Sariff#include <linux/sched.h> 19193640Sariff#include <linux/spinlock.h> 20193640Sariff#include <linux/wait.h> 21193640Sariff 22193640Sariff#include "logger.h" 23193640Sariff#include "memory-alloc.h" 24193640Sariff#include "murmurhash3.h" 25193640Sariff#include "permassert.h" 26193640Sariff 27193640Sariff#include "block-map.h" 28193640Sariff#include "dump.h" 29193640Sariff#include "encodings.h" 30193640Sariff#include "int-map.h" 31193640Sariff#include "io-submitter.h" 32193640Sariff#include "logical-zone.h" 33193640Sariff#include "packer.h" 34193640Sariff#include "recovery-journal.h" 35193640Sariff#include "slab-depot.h" 36193640Sariff#include "status-codes.h" 37193640Sariff#include "types.h" 38193640Sariff#include "vdo.h" 39193640Sariff#include "vio.h" 40193640Sariff#include "wait-queue.h" 41193640Sariff 42193640Sariff/** 43193640Sariff * DOC: Bio flags. 44193640Sariff * 45193640Sariff * For certain flags set on user bios, if the user bio has not yet been acknowledged, setting those 46193640Sariff * flags on our own bio(s) for that request may help underlying layers better fulfill the user 47193640Sariff * bio's needs. This constant contains the aggregate of those flags; VDO strips all the other 48193640Sariff * flags, as they convey incorrect information. 49193640Sariff * 50193640Sariff * These flags are always irrelevant if we have already finished the user bio as they are only 51193640Sariff * hints on IO importance. If VDO has finished the user bio, any remaining IO done doesn't care how 52193640Sariff * important finishing the finished bio was. 53193640Sariff * 54193640Sariff * Note that bio.c contains the complete list of flags we believe may be set; the following list 55193640Sariff * explains the action taken with each of those flags VDO could receive: 56193640Sariff * 57193640Sariff * * REQ_SYNC: Passed down if the user bio is not yet completed, since it indicates the user bio 58193640Sariff * completion is required for further work to be done by the issuer. 59193640Sariff * * REQ_META: Passed down if the user bio is not yet completed, since it may mean the lower layer 60193640Sariff * treats it as more urgent, similar to REQ_SYNC. 61193640Sariff * * REQ_PRIO: Passed down if the user bio is not yet completed, since it indicates the user bio is 62193640Sariff * important. 63193640Sariff * * REQ_NOMERGE: Set only if the incoming bio was split; irrelevant to VDO IO. 64193640Sariff * * REQ_IDLE: Set if the incoming bio had more IO quickly following; VDO's IO pattern doesn't 65193640Sariff * match incoming IO, so this flag is incorrect for it. 66193640Sariff * * REQ_FUA: Handled separately, and irrelevant to VDO IO otherwise. 67193640Sariff * * REQ_RAHEAD: Passed down, as, for reads, it indicates trivial importance. 68193640Sariff * * REQ_BACKGROUND: Not passed down, as VIOs are a limited resource and VDO needs them recycled 69193640Sariff * ASAP to service heavy load, which is the only place where REQ_BACKGROUND might aid in load 70193640Sariff * prioritization. 71193640Sariff */ 72193640Sariffstatic blk_opf_t PASSTHROUGH_FLAGS = (REQ_PRIO | REQ_META | REQ_SYNC | REQ_RAHEAD); 73193640Sariff 74193640Sariff/** 75193640Sariff * DOC: 76193640Sariff * 77193640Sariff * The data_vio_pool maintains the pool of data_vios which a vdo uses to service incoming bios. For 78193640Sariff * correctness, and in order to avoid potentially expensive or blocking memory allocations during 79193640Sariff * normal operation, the number of concurrently active data_vios is capped. Furthermore, in order 80193640Sariff * to avoid starvation of reads and writes, at most 75% of the data_vios may be used for 81193640Sariff * discards. The data_vio_pool is responsible for enforcing these limits. Threads submitting bios 82193640Sariff * for which a data_vio or discard permit are not available will block until the necessary 83193640Sariff * resources are available. The pool is also responsible for distributing resources to blocked 84193640Sariff * threads and waking them. Finally, the pool attempts to batch the work of recycling data_vios by 85193640Sariff * performing the work of actually assigning resources to blocked threads or placing data_vios back 86193640Sariff * into the pool on a single cpu at a time. 87193640Sariff * 88193640Sariff * The pool contains two "limiters", one for tracking data_vios and one for tracking discard 89193640Sariff * permits. The limiters also provide safe cross-thread access to pool statistics without the need 90193640Sariff * to take the pool's lock. When a thread submits a bio to a vdo device, it will first attempt to 91193640Sariff * get a discard permit if it is a discard, and then to get a data_vio. If the necessary resources 92193640Sariff * are available, the incoming bio will be assigned to the acquired data_vio, and it will be 93193640Sariff * launched. However, if either of these are unavailable, the arrival time of the bio is recorded 94193640Sariff * in the bio's bi_private field, the bio and its submitter are both queued on the appropriate 95193640Sariff * limiter and the submitting thread will then put itself to sleep. (note that this mechanism will 96193640Sariff * break if jiffies are only 32 bits.) 97193640Sariff * 98193640Sariff * Whenever a data_vio has completed processing for the bio it was servicing, release_data_vio() 99193640Sariff * will be called on it. This function will add the data_vio to a funnel queue, and then check the 100193640Sariff * state of the pool. If the pool is not currently processing released data_vios, the pool's 101193640Sariff * completion will be enqueued on a cpu queue. This obviates the need for the releasing threads to 102193640Sariff * hold the pool's lock, and also batches release work while avoiding starvation of the cpu 103193640Sariff * threads. 104193640Sariff * 105193640Sariff * Whenever the pool's completion is run on a cpu thread, it calls process_release_callback() which 106193640Sariff * processes a batch of returned data_vios (currently at most 32) from the pool's funnel queue. For 107193640Sariff * each data_vio, it first checks whether that data_vio was processing a discard. If so, and there 108193640Sariff * is a blocked bio waiting for a discard permit, that permit is notionally transferred to the 109193640Sariff * eldest discard waiter, and that waiter is moved to the end of the list of discard bios waiting 110193640Sariff * for a data_vio. If there are no discard waiters, the discard permit is returned to the pool. 111193640Sariff * Next, the data_vio is assigned to the oldest blocked bio which either has a discard permit, or 112193640Sariff * doesn't need one and relaunched. If neither of these exist, the data_vio is returned to the 113193640Sariff * pool. Finally, if any waiting bios were launched, the threads which blocked trying to submit 114193640Sariff * them are awakened. 115193640Sariff */ 116193640Sariff 117193640Sariff#define DATA_VIO_RELEASE_BATCH_SIZE 128 118193640Sariff 119193640Sariffstatic const unsigned int VDO_SECTORS_PER_BLOCK_MASK = VDO_SECTORS_PER_BLOCK - 1; 120193640Sariffstatic const u32 COMPRESSION_STATUS_MASK = 0xff; 121193640Sariffstatic const u32 MAY_NOT_COMPRESS_MASK = 0x80000000; 122193640Sariff 123193640Sariffstruct limiter; 124193640Sarifftypedef void (*assigner_fn)(struct limiter *limiter); 125193640Sariff 126193640Sariff/* Bookkeeping structure for a single type of resource. */ 127193640Sariffstruct limiter { 128193640Sariff /* The data_vio_pool to which this limiter belongs */ 129193640Sariff struct data_vio_pool *pool; 130193640Sariff /* The maximum number of data_vios available */ 131193640Sariff data_vio_count_t limit; 132193640Sariff /* The number of resources in use */ 133193640Sariff data_vio_count_t busy; 134193640Sariff /* The maximum number of resources ever simultaneously in use */ 135193640Sariff data_vio_count_t max_busy; 136193640Sariff /* The number of resources to release */ 137193640Sariff data_vio_count_t release_count; 138193640Sariff /* The number of waiters to wake */ 139193640Sariff data_vio_count_t wake_count; 140193640Sariff /* The list of waiting bios which are known to process_release_callback() */ 141193640Sariff struct bio_list waiters; 142193640Sariff /* The list of waiting bios which are not yet known to process_release_callback() */ 143 struct bio_list new_waiters; 144 /* The list of waiters which have their permits */ 145 struct bio_list *permitted_waiters; 146 /* The function for assigning a resource to a waiter */ 147 assigner_fn assigner; 148 /* The queue of blocked threads */ 149 wait_queue_head_t blocked_threads; 150 /* The arrival time of the eldest waiter */ 151 u64 arrival; 152}; 153 154/* 155 * A data_vio_pool is a collection of preallocated data_vios which may be acquired from any thread, 156 * and are released in batches. 157 */ 158struct data_vio_pool { 159 /* Completion for scheduling releases */ 160 struct vdo_completion completion; 161 /* The administrative state of the pool */ 162 struct admin_state state; 163 /* Lock protecting the pool */ 164 spinlock_t lock; 165 /* The main limiter controlling the total data_vios in the pool. */ 166 struct limiter limiter; 167 /* The limiter controlling data_vios for discard */ 168 struct limiter discard_limiter; 169 /* The list of bios which have discard permits but still need a data_vio */ 170 struct bio_list permitted_discards; 171 /* The list of available data_vios */ 172 struct list_head available; 173 /* The queue of data_vios waiting to be returned to the pool */ 174 struct funnel_queue *queue; 175 /* Whether the pool is processing, or scheduled to process releases */ 176 atomic_t processing; 177 /* The data vios in the pool */ 178 struct data_vio data_vios[]; 179}; 180 181static const char * const ASYNC_OPERATION_NAMES[] = { 182 "launch", 183 "acknowledge_write", 184 "acquire_hash_lock", 185 "attempt_logical_block_lock", 186 "lock_duplicate_pbn", 187 "check_for_duplication", 188 "cleanup", 189 "compress_data_vio", 190 "find_block_map_slot", 191 "get_mapped_block_for_read", 192 "get_mapped_block_for_write", 193 "hash_data_vio", 194 "journal_remapping", 195 "vdo_attempt_packing", 196 "put_mapped_block", 197 "read_data_vio", 198 "update_dedupe_index", 199 "update_reference_counts", 200 "verify_duplication", 201 "write_data_vio", 202}; 203 204/* The steps taken cleaning up a VIO, in the order they are performed. */ 205enum data_vio_cleanup_stage { 206 VIO_CLEANUP_START, 207 VIO_RELEASE_HASH_LOCK = VIO_CLEANUP_START, 208 VIO_RELEASE_ALLOCATED, 209 VIO_RELEASE_RECOVERY_LOCKS, 210 VIO_RELEASE_LOGICAL, 211 VIO_CLEANUP_DONE 212}; 213 214static inline struct data_vio_pool * __must_check 215as_data_vio_pool(struct vdo_completion *completion) 216{ 217 vdo_assert_completion_type(completion, VDO_DATA_VIO_POOL_COMPLETION); 218 return container_of(completion, struct data_vio_pool, completion); 219} 220 221static inline u64 get_arrival_time(struct bio *bio) 222{ 223 return (u64) bio->bi_private; 224} 225 226/** 227 * check_for_drain_complete_locked() - Check whether a data_vio_pool has no outstanding data_vios 228 * or waiters while holding the pool's lock. 229 */ 230static bool check_for_drain_complete_locked(struct data_vio_pool *pool) 231{ 232 if (pool->limiter.busy > 0) 233 return false; 234 235 VDO_ASSERT_LOG_ONLY((pool->discard_limiter.busy == 0), 236 "no outstanding discard permits"); 237 238 return (bio_list_empty(&pool->limiter.new_waiters) && 239 bio_list_empty(&pool->discard_limiter.new_waiters)); 240} 241 242static void initialize_lbn_lock(struct data_vio *data_vio, logical_block_number_t lbn) 243{ 244 struct vdo *vdo = vdo_from_data_vio(data_vio); 245 zone_count_t zone_number; 246 struct lbn_lock *lock = &data_vio->logical; 247 248 lock->lbn = lbn; 249 lock->locked = false; 250 vdo_waitq_init(&lock->waiters); 251 zone_number = vdo_compute_logical_zone(data_vio); 252 lock->zone = &vdo->logical_zones->zones[zone_number]; 253} 254 255static void launch_locked_request(struct data_vio *data_vio) 256{ 257 data_vio->logical.locked = true; 258 if (data_vio->write) { 259 struct vdo *vdo = vdo_from_data_vio(data_vio); 260 261 if (vdo_is_read_only(vdo)) { 262 continue_data_vio_with_error(data_vio, VDO_READ_ONLY); 263 return; 264 } 265 } 266 267 data_vio->last_async_operation = VIO_ASYNC_OP_FIND_BLOCK_MAP_SLOT; 268 vdo_find_block_map_slot(data_vio); 269} 270 271static void acknowledge_data_vio(struct data_vio *data_vio) 272{ 273 struct vdo *vdo = vdo_from_data_vio(data_vio); 274 struct bio *bio = data_vio->user_bio; 275 int error = vdo_status_to_errno(data_vio->vio.completion.result); 276 277 if (bio == NULL) 278 return; 279 280 VDO_ASSERT_LOG_ONLY((data_vio->remaining_discard <= 281 (u32) (VDO_BLOCK_SIZE - data_vio->offset)), 282 "data_vio to acknowledge is not an incomplete discard"); 283 284 data_vio->user_bio = NULL; 285 vdo_count_bios(&vdo->stats.bios_acknowledged, bio); 286 if (data_vio->is_partial) 287 vdo_count_bios(&vdo->stats.bios_acknowledged_partial, bio); 288 289 bio->bi_status = errno_to_blk_status(error); 290 bio_endio(bio); 291} 292 293static void copy_to_bio(struct bio *bio, char *data_ptr) 294{ 295 struct bio_vec biovec; 296 struct bvec_iter iter; 297 298 bio_for_each_segment(biovec, bio, iter) { 299 memcpy_to_bvec(&biovec, data_ptr); 300 data_ptr += biovec.bv_len; 301 } 302} 303 304struct data_vio_compression_status get_data_vio_compression_status(struct data_vio *data_vio) 305{ 306 u32 packed = atomic_read(&data_vio->compression.status); 307 308 /* pairs with cmpxchg in set_data_vio_compression_status */ 309 smp_rmb(); 310 return (struct data_vio_compression_status) { 311 .stage = packed & COMPRESSION_STATUS_MASK, 312 .may_not_compress = ((packed & MAY_NOT_COMPRESS_MASK) != 0), 313 }; 314} 315 316/** 317 * pack_status() - Convert a data_vio_compression_status into a u32 which may be stored 318 * atomically. 319 * @status: The state to convert. 320 * 321 * Return: The compression state packed into a u32. 322 */ 323static u32 __must_check pack_status(struct data_vio_compression_status status) 324{ 325 return status.stage | (status.may_not_compress ? MAY_NOT_COMPRESS_MASK : 0); 326} 327 328/** 329 * set_data_vio_compression_status() - Set the compression status of a data_vio. 330 * @state: The expected current status of the data_vio. 331 * @new_state: The status to set. 332 * 333 * Return: true if the new status was set, false if the data_vio's compression status did not 334 * match the expected state, and so was left unchanged. 335 */ 336static bool __must_check 337set_data_vio_compression_status(struct data_vio *data_vio, 338 struct data_vio_compression_status status, 339 struct data_vio_compression_status new_status) 340{ 341 u32 actual; 342 u32 expected = pack_status(status); 343 u32 replacement = pack_status(new_status); 344 345 /* 346 * Extra barriers because this was original developed using a CAS operation that implicitly 347 * had them. 348 */ 349 smp_mb__before_atomic(); 350 actual = atomic_cmpxchg(&data_vio->compression.status, expected, replacement); 351 /* same as before_atomic */ 352 smp_mb__after_atomic(); 353 return (expected == actual); 354} 355 356struct data_vio_compression_status advance_data_vio_compression_stage(struct data_vio *data_vio) 357{ 358 for (;;) { 359 struct data_vio_compression_status status = 360 get_data_vio_compression_status(data_vio); 361 struct data_vio_compression_status new_status = status; 362 363 if (status.stage == DATA_VIO_POST_PACKER) { 364 /* We're already in the last stage. */ 365 return status; 366 } 367 368 if (status.may_not_compress) { 369 /* 370 * Compression has been dis-allowed for this VIO, so skip the rest of the 371 * path and go to the end. 372 */ 373 new_status.stage = DATA_VIO_POST_PACKER; 374 } else { 375 /* Go to the next state. */ 376 new_status.stage++; 377 } 378 379 if (set_data_vio_compression_status(data_vio, status, new_status)) 380 return new_status; 381 382 /* Another thread changed the status out from under us so try again. */ 383 } 384} 385 386/** 387 * cancel_data_vio_compression() - Prevent this data_vio from being compressed or packed. 388 * 389 * Return: true if the data_vio is in the packer and the caller was the first caller to cancel it. 390 */ 391bool cancel_data_vio_compression(struct data_vio *data_vio) 392{ 393 struct data_vio_compression_status status, new_status; 394 395 for (;;) { 396 status = get_data_vio_compression_status(data_vio); 397 if (status.may_not_compress || (status.stage == DATA_VIO_POST_PACKER)) { 398 /* This data_vio is already set up to not block in the packer. */ 399 break; 400 } 401 402 new_status.stage = status.stage; 403 new_status.may_not_compress = true; 404 405 if (set_data_vio_compression_status(data_vio, status, new_status)) 406 break; 407 } 408 409 return ((status.stage == DATA_VIO_PACKING) && !status.may_not_compress); 410} 411 412/** 413 * attempt_logical_block_lock() - Attempt to acquire the lock on a logical block. 414 * @completion: The data_vio for an external data request as a completion. 415 * 416 * This is the start of the path for all external requests. It is registered in launch_data_vio(). 417 */ 418static void attempt_logical_block_lock(struct vdo_completion *completion) 419{ 420 struct data_vio *data_vio = as_data_vio(completion); 421 struct lbn_lock *lock = &data_vio->logical; 422 struct vdo *vdo = vdo_from_data_vio(data_vio); 423 struct data_vio *lock_holder; 424 int result; 425 426 assert_data_vio_in_logical_zone(data_vio); 427 428 if (data_vio->logical.lbn >= vdo->states.vdo.config.logical_blocks) { 429 continue_data_vio_with_error(data_vio, VDO_OUT_OF_RANGE); 430 return; 431 } 432 433 result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn, 434 data_vio, false, (void **) &lock_holder); 435 if (result != VDO_SUCCESS) { 436 continue_data_vio_with_error(data_vio, result); 437 return; 438 } 439 440 if (lock_holder == NULL) { 441 /* We got the lock */ 442 launch_locked_request(data_vio); 443 return; 444 } 445 446 result = VDO_ASSERT(lock_holder->logical.locked, "logical block lock held"); 447 if (result != VDO_SUCCESS) { 448 continue_data_vio_with_error(data_vio, result); 449 return; 450 } 451 452 /* 453 * If the new request is a pure read request (not read-modify-write) and the lock_holder is 454 * writing and has received an allocation, service the read request immediately by copying 455 * data from the lock_holder to avoid having to flush the write out of the packer just to 456 * prevent the read from waiting indefinitely. If the lock_holder does not yet have an 457 * allocation, prevent it from blocking in the packer and wait on it. This is necessary in 458 * order to prevent returning data that may not have actually been written. 459 */ 460 if (!data_vio->write && READ_ONCE(lock_holder->allocation_succeeded)) { 461 copy_to_bio(data_vio->user_bio, lock_holder->vio.data + data_vio->offset); 462 acknowledge_data_vio(data_vio); 463 complete_data_vio(completion); 464 return; 465 } 466 467 data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_LOGICAL_BLOCK_LOCK; 468 vdo_waitq_enqueue_waiter(&lock_holder->logical.waiters, &data_vio->waiter); 469 470 /* 471 * Prevent writes and read-modify-writes from blocking indefinitely on lock holders in the 472 * packer. 473 */ 474 if (lock_holder->write && cancel_data_vio_compression(lock_holder)) { 475 data_vio->compression.lock_holder = lock_holder; 476 launch_data_vio_packer_callback(data_vio, 477 vdo_remove_lock_holder_from_packer); 478 } 479} 480 481/** 482 * launch_data_vio() - (Re)initialize a data_vio to have a new logical block number, keeping the 483 * same parent and other state and send it on its way. 484 */ 485static void launch_data_vio(struct data_vio *data_vio, logical_block_number_t lbn) 486{ 487 struct vdo_completion *completion = &data_vio->vio.completion; 488 489 /* 490 * Clearing the tree lock must happen before initializing the LBN lock, which also adds 491 * information to the tree lock. 492 */ 493 memset(&data_vio->tree_lock, 0, sizeof(data_vio->tree_lock)); 494 initialize_lbn_lock(data_vio, lbn); 495 INIT_LIST_HEAD(&data_vio->hash_lock_entry); 496 INIT_LIST_HEAD(&data_vio->write_entry); 497 498 memset(&data_vio->allocation, 0, sizeof(data_vio->allocation)); 499 500 data_vio->is_duplicate = false; 501 502 memset(&data_vio->record_name, 0, sizeof(data_vio->record_name)); 503 memset(&data_vio->duplicate, 0, sizeof(data_vio->duplicate)); 504 vdo_reset_completion(completion); 505 completion->error_handler = handle_data_vio_error; 506 set_data_vio_logical_callback(data_vio, attempt_logical_block_lock); 507 vdo_enqueue_completion(completion, VDO_DEFAULT_Q_MAP_BIO_PRIORITY); 508} 509 510static bool is_zero_block(char *block) 511{ 512 int i; 513 514 for (i = 0; i < VDO_BLOCK_SIZE; i += sizeof(u64)) { 515 if (*((u64 *) &block[i])) 516 return false; 517 } 518 519 return true; 520} 521 522static void copy_from_bio(struct bio *bio, char *data_ptr) 523{ 524 struct bio_vec biovec; 525 struct bvec_iter iter; 526 527 bio_for_each_segment(biovec, bio, iter) { 528 memcpy_from_bvec(data_ptr, &biovec); 529 data_ptr += biovec.bv_len; 530 } 531} 532 533static void launch_bio(struct vdo *vdo, struct data_vio *data_vio, struct bio *bio) 534{ 535 logical_block_number_t lbn; 536 /* 537 * Zero out the fields which don't need to be preserved (i.e. which are not pointers to 538 * separately allocated objects). 539 */ 540 memset(data_vio, 0, offsetof(struct data_vio, vio)); 541 memset(&data_vio->compression, 0, offsetof(struct compression_state, block)); 542 543 data_vio->user_bio = bio; 544 data_vio->offset = to_bytes(bio->bi_iter.bi_sector & VDO_SECTORS_PER_BLOCK_MASK); 545 data_vio->is_partial = (bio->bi_iter.bi_size < VDO_BLOCK_SIZE) || (data_vio->offset != 0); 546 547 /* 548 * Discards behave very differently than other requests when coming in from device-mapper. 549 * We have to be able to handle any size discards and various sector offsets within a 550 * block. 551 */ 552 if (bio_op(bio) == REQ_OP_DISCARD) { 553 data_vio->remaining_discard = bio->bi_iter.bi_size; 554 data_vio->write = true; 555 data_vio->is_discard = true; 556 if (data_vio->is_partial) { 557 vdo_count_bios(&vdo->stats.bios_in_partial, bio); 558 data_vio->read = true; 559 } 560 } else if (data_vio->is_partial) { 561 vdo_count_bios(&vdo->stats.bios_in_partial, bio); 562 data_vio->read = true; 563 if (bio_data_dir(bio) == WRITE) 564 data_vio->write = true; 565 } else if (bio_data_dir(bio) == READ) { 566 data_vio->read = true; 567 } else { 568 /* 569 * Copy the bio data to a char array so that we can continue to use the data after 570 * we acknowledge the bio. 571 */ 572 copy_from_bio(bio, data_vio->vio.data); 573 data_vio->is_zero = is_zero_block(data_vio->vio.data); 574 data_vio->write = true; 575 } 576 577 if (data_vio->user_bio->bi_opf & REQ_FUA) 578 data_vio->fua = true; 579 580 lbn = (bio->bi_iter.bi_sector - vdo->starting_sector_offset) / VDO_SECTORS_PER_BLOCK; 581 launch_data_vio(data_vio, lbn); 582} 583 584static void assign_data_vio(struct limiter *limiter, struct data_vio *data_vio) 585{ 586 struct bio *bio = bio_list_pop(limiter->permitted_waiters); 587 588 launch_bio(limiter->pool->completion.vdo, data_vio, bio); 589 limiter->wake_count++; 590 591 bio = bio_list_peek(limiter->permitted_waiters); 592 limiter->arrival = ((bio == NULL) ? U64_MAX : get_arrival_time(bio)); 593} 594 595static void assign_discard_permit(struct limiter *limiter) 596{ 597 struct bio *bio = bio_list_pop(&limiter->waiters); 598 599 if (limiter->arrival == U64_MAX) 600 limiter->arrival = get_arrival_time(bio); 601 602 bio_list_add(limiter->permitted_waiters, bio); 603} 604 605static void get_waiters(struct limiter *limiter) 606{ 607 bio_list_merge(&limiter->waiters, &limiter->new_waiters); 608 bio_list_init(&limiter->new_waiters); 609} 610 611static inline struct data_vio *get_available_data_vio(struct data_vio_pool *pool) 612{ 613 struct data_vio *data_vio = 614 list_first_entry(&pool->available, struct data_vio, pool_entry); 615 616 list_del_init(&data_vio->pool_entry); 617 return data_vio; 618} 619 620static void assign_data_vio_to_waiter(struct limiter *limiter) 621{ 622 assign_data_vio(limiter, get_available_data_vio(limiter->pool)); 623} 624 625static void update_limiter(struct limiter *limiter) 626{ 627 struct bio_list *waiters = &limiter->waiters; 628 data_vio_count_t available = limiter->limit - limiter->busy; 629 630 VDO_ASSERT_LOG_ONLY((limiter->release_count <= limiter->busy), 631 "Release count %u is not more than busy count %u", 632 limiter->release_count, limiter->busy); 633 634 get_waiters(limiter); 635 for (; (limiter->release_count > 0) && !bio_list_empty(waiters); limiter->release_count--) 636 limiter->assigner(limiter); 637 638 if (limiter->release_count > 0) { 639 WRITE_ONCE(limiter->busy, limiter->busy - limiter->release_count); 640 limiter->release_count = 0; 641 return; 642 } 643 644 for (; (available > 0) && !bio_list_empty(waiters); available--) 645 limiter->assigner(limiter); 646 647 WRITE_ONCE(limiter->busy, limiter->limit - available); 648 if (limiter->max_busy < limiter->busy) 649 WRITE_ONCE(limiter->max_busy, limiter->busy); 650} 651 652/** 653 * schedule_releases() - Ensure that release processing is scheduled. 654 * 655 * If this call switches the state to processing, enqueue. Otherwise, some other thread has already 656 * done so. 657 */ 658static void schedule_releases(struct data_vio_pool *pool) 659{ 660 /* Pairs with the barrier in process_release_callback(). */ 661 smp_mb__before_atomic(); 662 if (atomic_cmpxchg(&pool->processing, false, true)) 663 return; 664 665 pool->completion.requeue = true; 666 vdo_launch_completion_with_priority(&pool->completion, 667 CPU_Q_COMPLETE_VIO_PRIORITY); 668} 669 670static void reuse_or_release_resources(struct data_vio_pool *pool, 671 struct data_vio *data_vio, 672 struct list_head *returned) 673{ 674 if (data_vio->remaining_discard > 0) { 675 if (bio_list_empty(&pool->discard_limiter.waiters)) { 676 /* Return the data_vio's discard permit. */ 677 pool->discard_limiter.release_count++; 678 } else { 679 assign_discard_permit(&pool->discard_limiter); 680 } 681 } 682 683 if (pool->limiter.arrival < pool->discard_limiter.arrival) { 684 assign_data_vio(&pool->limiter, data_vio); 685 } else if (pool->discard_limiter.arrival < U64_MAX) { 686 assign_data_vio(&pool->discard_limiter, data_vio); 687 } else { 688 list_add(&data_vio->pool_entry, returned); 689 pool->limiter.release_count++; 690 } 691} 692 693/** 694 * process_release_callback() - Process a batch of data_vio releases. 695 * @completion: The pool with data_vios to release. 696 */ 697static void process_release_callback(struct vdo_completion *completion) 698{ 699 struct data_vio_pool *pool = as_data_vio_pool(completion); 700 bool reschedule; 701 bool drained; 702 data_vio_count_t processed; 703 data_vio_count_t to_wake; 704 data_vio_count_t discards_to_wake; 705 LIST_HEAD(returned); 706 707 spin_lock(&pool->lock); 708 get_waiters(&pool->discard_limiter); 709 get_waiters(&pool->limiter); 710 spin_unlock(&pool->lock); 711 712 if (pool->limiter.arrival == U64_MAX) { 713 struct bio *bio = bio_list_peek(&pool->limiter.waiters); 714 715 if (bio != NULL) 716 pool->limiter.arrival = get_arrival_time(bio); 717 } 718 719 for (processed = 0; processed < DATA_VIO_RELEASE_BATCH_SIZE; processed++) { 720 struct data_vio *data_vio; 721 struct funnel_queue_entry *entry = vdo_funnel_queue_poll(pool->queue); 722 723 if (entry == NULL) 724 break; 725 726 data_vio = as_data_vio(container_of(entry, struct vdo_completion, 727 work_queue_entry_link)); 728 acknowledge_data_vio(data_vio); 729 reuse_or_release_resources(pool, data_vio, &returned); 730 } 731 732 spin_lock(&pool->lock); 733 /* 734 * There is a race where waiters could be added while we are in the unlocked section above. 735 * Those waiters could not see the resources we are now about to release, so we assign 736 * those resources now as we have no guarantee of being rescheduled. This is handled in 737 * update_limiter(). 738 */ 739 update_limiter(&pool->discard_limiter); 740 list_splice(&returned, &pool->available); 741 update_limiter(&pool->limiter); 742 to_wake = pool->limiter.wake_count; 743 pool->limiter.wake_count = 0; 744 discards_to_wake = pool->discard_limiter.wake_count; 745 pool->discard_limiter.wake_count = 0; 746 747 atomic_set(&pool->processing, false); 748 /* Pairs with the barrier in schedule_releases(). */ 749 smp_mb(); 750 751 reschedule = !vdo_is_funnel_queue_empty(pool->queue); 752 drained = (!reschedule && 753 vdo_is_state_draining(&pool->state) && 754 check_for_drain_complete_locked(pool)); 755 spin_unlock(&pool->lock); 756 757 if (to_wake > 0) 758 wake_up_nr(&pool->limiter.blocked_threads, to_wake); 759 760 if (discards_to_wake > 0) 761 wake_up_nr(&pool->discard_limiter.blocked_threads, discards_to_wake); 762 763 if (reschedule) 764 schedule_releases(pool); 765 else if (drained) 766 vdo_finish_draining(&pool->state); 767} 768 769static void initialize_limiter(struct limiter *limiter, struct data_vio_pool *pool, 770 assigner_fn assigner, data_vio_count_t limit) 771{ 772 limiter->pool = pool; 773 limiter->assigner = assigner; 774 limiter->limit = limit; 775 limiter->arrival = U64_MAX; 776 init_waitqueue_head(&limiter->blocked_threads); 777} 778 779/** 780 * initialize_data_vio() - Allocate the components of a data_vio. 781 * 782 * The caller is responsible for cleaning up the data_vio on error. 783 * 784 * Return: VDO_SUCCESS or an error. 785 */ 786static int initialize_data_vio(struct data_vio *data_vio, struct vdo *vdo) 787{ 788 struct bio *bio; 789 int result; 790 791 BUILD_BUG_ON(VDO_BLOCK_SIZE > PAGE_SIZE); 792 result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "data_vio data", 793 &data_vio->vio.data); 794 if (result != VDO_SUCCESS) 795 return vdo_log_error_strerror(result, 796 "data_vio data allocation failure"); 797 798 result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "compressed block", 799 &data_vio->compression.block); 800 if (result != VDO_SUCCESS) { 801 return vdo_log_error_strerror(result, 802 "data_vio compressed block allocation failure"); 803 } 804 805 result = vdo_allocate_memory(VDO_BLOCK_SIZE, 0, "vio scratch", 806 &data_vio->scratch_block); 807 if (result != VDO_SUCCESS) 808 return vdo_log_error_strerror(result, 809 "data_vio scratch allocation failure"); 810 811 result = vdo_create_bio(&bio); 812 if (result != VDO_SUCCESS) 813 return vdo_log_error_strerror(result, 814 "data_vio data bio allocation failure"); 815 816 vdo_initialize_completion(&data_vio->decrement_completion, vdo, 817 VDO_DECREMENT_COMPLETION); 818 initialize_vio(&data_vio->vio, bio, 1, VIO_TYPE_DATA, VIO_PRIORITY_DATA, vdo); 819 820 return VDO_SUCCESS; 821} 822 823static void destroy_data_vio(struct data_vio *data_vio) 824{ 825 if (data_vio == NULL) 826 return; 827 828 vdo_free_bio(vdo_forget(data_vio->vio.bio)); 829 vdo_free(vdo_forget(data_vio->vio.data)); 830 vdo_free(vdo_forget(data_vio->compression.block)); 831 vdo_free(vdo_forget(data_vio->scratch_block)); 832} 833 834/** 835 * make_data_vio_pool() - Initialize a data_vio pool. 836 * @vdo: The vdo to which the pool will belong. 837 * @pool_size: The number of data_vios in the pool. 838 * @discard_limit: The maximum number of data_vios which may be used for discards. 839 * @pool: A pointer to hold the newly allocated pool. 840 */ 841int make_data_vio_pool(struct vdo *vdo, data_vio_count_t pool_size, 842 data_vio_count_t discard_limit, struct data_vio_pool **pool_ptr) 843{ 844 int result; 845 struct data_vio_pool *pool; 846 data_vio_count_t i; 847 848 result = vdo_allocate_extended(struct data_vio_pool, pool_size, struct data_vio, 849 __func__, &pool); 850 if (result != VDO_SUCCESS) 851 return result; 852 853 VDO_ASSERT_LOG_ONLY((discard_limit <= pool_size), 854 "discard limit does not exceed pool size"); 855 initialize_limiter(&pool->discard_limiter, pool, assign_discard_permit, 856 discard_limit); 857 pool->discard_limiter.permitted_waiters = &pool->permitted_discards; 858 initialize_limiter(&pool->limiter, pool, assign_data_vio_to_waiter, pool_size); 859 pool->limiter.permitted_waiters = &pool->limiter.waiters; 860 INIT_LIST_HEAD(&pool->available); 861 spin_lock_init(&pool->lock); 862 vdo_set_admin_state_code(&pool->state, VDO_ADMIN_STATE_NORMAL_OPERATION); 863 vdo_initialize_completion(&pool->completion, vdo, VDO_DATA_VIO_POOL_COMPLETION); 864 vdo_prepare_completion(&pool->completion, process_release_callback, 865 process_release_callback, vdo->thread_config.cpu_thread, 866 NULL); 867 868 result = vdo_make_funnel_queue(&pool->queue); 869 if (result != VDO_SUCCESS) { 870 free_data_vio_pool(vdo_forget(pool)); 871 return result; 872 } 873 874 for (i = 0; i < pool_size; i++) { 875 struct data_vio *data_vio = &pool->data_vios[i]; 876 877 result = initialize_data_vio(data_vio, vdo); 878 if (result != VDO_SUCCESS) { 879 destroy_data_vio(data_vio); 880 free_data_vio_pool(pool); 881 return result; 882 } 883 884 list_add(&data_vio->pool_entry, &pool->available); 885 } 886 887 *pool_ptr = pool; 888 return VDO_SUCCESS; 889} 890 891/** 892 * free_data_vio_pool() - Free a data_vio_pool and the data_vios in it. 893 * 894 * All data_vios must be returned to the pool before calling this function. 895 */ 896void free_data_vio_pool(struct data_vio_pool *pool) 897{ 898 struct data_vio *data_vio, *tmp; 899 900 if (pool == NULL) 901 return; 902 903 /* 904 * Pairs with the barrier in process_release_callback(). Possibly not needed since it 905 * caters to an enqueue vs. free race. 906 */ 907 smp_mb(); 908 BUG_ON(atomic_read(&pool->processing)); 909 910 spin_lock(&pool->lock); 911 VDO_ASSERT_LOG_ONLY((pool->limiter.busy == 0), 912 "data_vio pool must not have %u busy entries when being freed", 913 pool->limiter.busy); 914 VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->limiter.waiters) && 915 bio_list_empty(&pool->limiter.new_waiters)), 916 "data_vio pool must not have threads waiting to read or write when being freed"); 917 VDO_ASSERT_LOG_ONLY((bio_list_empty(&pool->discard_limiter.waiters) && 918 bio_list_empty(&pool->discard_limiter.new_waiters)), 919 "data_vio pool must not have threads waiting to discard when being freed"); 920 spin_unlock(&pool->lock); 921 922 list_for_each_entry_safe(data_vio, tmp, &pool->available, pool_entry) { 923 list_del_init(&data_vio->pool_entry); 924 destroy_data_vio(data_vio); 925 } 926 927 vdo_free_funnel_queue(vdo_forget(pool->queue)); 928 vdo_free(pool); 929} 930 931static bool acquire_permit(struct limiter *limiter) 932{ 933 if (limiter->busy >= limiter->limit) 934 return false; 935 936 WRITE_ONCE(limiter->busy, limiter->busy + 1); 937 if (limiter->max_busy < limiter->busy) 938 WRITE_ONCE(limiter->max_busy, limiter->busy); 939 return true; 940} 941 942static void wait_permit(struct limiter *limiter, struct bio *bio) 943 __releases(&limiter->pool->lock) 944{ 945 DEFINE_WAIT(wait); 946 947 bio_list_add(&limiter->new_waiters, bio); 948 prepare_to_wait_exclusive(&limiter->blocked_threads, &wait, 949 TASK_UNINTERRUPTIBLE); 950 spin_unlock(&limiter->pool->lock); 951 io_schedule(); 952 finish_wait(&limiter->blocked_threads, &wait); 953} 954 955/** 956 * vdo_launch_bio() - Acquire a data_vio from the pool, assign the bio to it, and launch it. 957 * 958 * This will block if data_vios or discard permits are not available. 959 */ 960void vdo_launch_bio(struct data_vio_pool *pool, struct bio *bio) 961{ 962 struct data_vio *data_vio; 963 964 VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&pool->state), 965 "data_vio_pool not quiescent on acquire"); 966 967 bio->bi_private = (void *) jiffies; 968 spin_lock(&pool->lock); 969 if ((bio_op(bio) == REQ_OP_DISCARD) && 970 !acquire_permit(&pool->discard_limiter)) { 971 wait_permit(&pool->discard_limiter, bio); 972 return; 973 } 974 975 if (!acquire_permit(&pool->limiter)) { 976 wait_permit(&pool->limiter, bio); 977 return; 978 } 979 980 data_vio = get_available_data_vio(pool); 981 spin_unlock(&pool->lock); 982 launch_bio(pool->completion.vdo, data_vio, bio); 983} 984 985/* Implements vdo_admin_initiator_fn. */ 986static void initiate_drain(struct admin_state *state) 987{ 988 bool drained; 989 struct data_vio_pool *pool = container_of(state, struct data_vio_pool, state); 990 991 spin_lock(&pool->lock); 992 drained = check_for_drain_complete_locked(pool); 993 spin_unlock(&pool->lock); 994 995 if (drained) 996 vdo_finish_draining(state); 997} 998 999static void assert_on_vdo_cpu_thread(const struct vdo *vdo, const char *name) 1000{ 1001 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.cpu_thread), 1002 "%s called on cpu thread", name); 1003} 1004 1005/** 1006 * drain_data_vio_pool() - Wait asynchronously for all data_vios to be returned to the pool. 1007 * @completion: The completion to notify when the pool has drained. 1008 */ 1009void drain_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion) 1010{ 1011 assert_on_vdo_cpu_thread(completion->vdo, __func__); 1012 vdo_start_draining(&pool->state, VDO_ADMIN_STATE_SUSPENDING, completion, 1013 initiate_drain); 1014} 1015 1016/** 1017 * resume_data_vio_pool() - Resume a data_vio pool. 1018 * @completion: The completion to notify when the pool has resumed. 1019 */ 1020void resume_data_vio_pool(struct data_vio_pool *pool, struct vdo_completion *completion) 1021{ 1022 assert_on_vdo_cpu_thread(completion->vdo, __func__); 1023 vdo_continue_completion(completion, vdo_resume_if_quiescent(&pool->state)); 1024} 1025 1026static void dump_limiter(const char *name, struct limiter *limiter) 1027{ 1028 vdo_log_info("%s: %u of %u busy (max %u), %s", name, limiter->busy, 1029 limiter->limit, limiter->max_busy, 1030 ((bio_list_empty(&limiter->waiters) && 1031 bio_list_empty(&limiter->new_waiters)) ? 1032 "no waiters" : "has waiters")); 1033} 1034 1035/** 1036 * dump_data_vio_pool() - Dump a data_vio pool to the log. 1037 * @dump_vios: Whether to dump the details of each busy data_vio as well. 1038 */ 1039void dump_data_vio_pool(struct data_vio_pool *pool, bool dump_vios) 1040{ 1041 /* 1042 * In order that syslog can empty its buffer, sleep after 35 elements for 4ms (till the 1043 * second clock tick). These numbers were picked based on experiments with lab machines. 1044 */ 1045 static const int ELEMENTS_PER_BATCH = 35; 1046 static const int SLEEP_FOR_SYSLOG = 4000; 1047 1048 if (pool == NULL) 1049 return; 1050 1051 spin_lock(&pool->lock); 1052 dump_limiter("data_vios", &pool->limiter); 1053 dump_limiter("discard permits", &pool->discard_limiter); 1054 if (dump_vios) { 1055 int i; 1056 int dumped = 0; 1057 1058 for (i = 0; i < pool->limiter.limit; i++) { 1059 struct data_vio *data_vio = &pool->data_vios[i]; 1060 1061 if (!list_empty(&data_vio->pool_entry)) 1062 continue; 1063 1064 dump_data_vio(data_vio); 1065 if (++dumped >= ELEMENTS_PER_BATCH) { 1066 spin_unlock(&pool->lock); 1067 dumped = 0; 1068 fsleep(SLEEP_FOR_SYSLOG); 1069 spin_lock(&pool->lock); 1070 } 1071 } 1072 } 1073 1074 spin_unlock(&pool->lock); 1075} 1076 1077data_vio_count_t get_data_vio_pool_active_discards(struct data_vio_pool *pool) 1078{ 1079 return READ_ONCE(pool->discard_limiter.busy); 1080} 1081 1082data_vio_count_t get_data_vio_pool_discard_limit(struct data_vio_pool *pool) 1083{ 1084 return READ_ONCE(pool->discard_limiter.limit); 1085} 1086 1087data_vio_count_t get_data_vio_pool_maximum_discards(struct data_vio_pool *pool) 1088{ 1089 return READ_ONCE(pool->discard_limiter.max_busy); 1090} 1091 1092int set_data_vio_pool_discard_limit(struct data_vio_pool *pool, data_vio_count_t limit) 1093{ 1094 if (get_data_vio_pool_request_limit(pool) < limit) { 1095 // The discard limit may not be higher than the data_vio limit. 1096 return -EINVAL; 1097 } 1098 1099 spin_lock(&pool->lock); 1100 pool->discard_limiter.limit = limit; 1101 spin_unlock(&pool->lock); 1102 1103 return VDO_SUCCESS; 1104} 1105 1106data_vio_count_t get_data_vio_pool_active_requests(struct data_vio_pool *pool) 1107{ 1108 return READ_ONCE(pool->limiter.busy); 1109} 1110 1111data_vio_count_t get_data_vio_pool_request_limit(struct data_vio_pool *pool) 1112{ 1113 return READ_ONCE(pool->limiter.limit); 1114} 1115 1116data_vio_count_t get_data_vio_pool_maximum_requests(struct data_vio_pool *pool) 1117{ 1118 return READ_ONCE(pool->limiter.max_busy); 1119} 1120 1121static void update_data_vio_error_stats(struct data_vio *data_vio) 1122{ 1123 u8 index = 0; 1124 static const char * const operations[] = { 1125 [0] = "empty", 1126 [1] = "read", 1127 [2] = "write", 1128 [3] = "read-modify-write", 1129 [5] = "read+fua", 1130 [6] = "write+fua", 1131 [7] = "read-modify-write+fua", 1132 }; 1133 1134 if (data_vio->read) 1135 index = 1; 1136 1137 if (data_vio->write) 1138 index += 2; 1139 1140 if (data_vio->fua) 1141 index += 4; 1142 1143 update_vio_error_stats(&data_vio->vio, 1144 "Completing %s vio for LBN %llu with error after %s", 1145 operations[index], 1146 (unsigned long long) data_vio->logical.lbn, 1147 get_data_vio_operation_name(data_vio)); 1148} 1149 1150static void perform_cleanup_stage(struct data_vio *data_vio, 1151 enum data_vio_cleanup_stage stage); 1152 1153/** 1154 * release_allocated_lock() - Release the PBN lock and/or the reference on the allocated block at 1155 * the end of processing a data_vio. 1156 */ 1157static void release_allocated_lock(struct vdo_completion *completion) 1158{ 1159 struct data_vio *data_vio = as_data_vio(completion); 1160 1161 assert_data_vio_in_allocated_zone(data_vio); 1162 release_data_vio_allocation_lock(data_vio, false); 1163 perform_cleanup_stage(data_vio, VIO_RELEASE_RECOVERY_LOCKS); 1164} 1165 1166/** release_lock() - Release an uncontended LBN lock. */ 1167static void release_lock(struct data_vio *data_vio, struct lbn_lock *lock) 1168{ 1169 struct int_map *lock_map = lock->zone->lbn_operations; 1170 struct data_vio *lock_holder; 1171 1172 if (!lock->locked) { 1173 /* The lock is not locked, so it had better not be registered in the lock map. */ 1174 struct data_vio *lock_holder = vdo_int_map_get(lock_map, lock->lbn); 1175 1176 VDO_ASSERT_LOG_ONLY((data_vio != lock_holder), 1177 "no logical block lock held for block %llu", 1178 (unsigned long long) lock->lbn); 1179 return; 1180 } 1181 1182 /* Release the lock by removing the lock from the map. */ 1183 lock_holder = vdo_int_map_remove(lock_map, lock->lbn); 1184 VDO_ASSERT_LOG_ONLY((data_vio == lock_holder), 1185 "logical block lock mismatch for block %llu", 1186 (unsigned long long) lock->lbn); 1187 lock->locked = false; 1188} 1189 1190/** transfer_lock() - Transfer a contended LBN lock to the eldest waiter. */ 1191static void transfer_lock(struct data_vio *data_vio, struct lbn_lock *lock) 1192{ 1193 struct data_vio *lock_holder, *next_lock_holder; 1194 int result; 1195 1196 VDO_ASSERT_LOG_ONLY(lock->locked, "lbn_lock with waiters is not locked"); 1197 1198 /* Another data_vio is waiting for the lock, transfer it in a single lock map operation. */ 1199 next_lock_holder = 1200 vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&lock->waiters)); 1201 1202 /* Transfer the remaining lock waiters to the next lock holder. */ 1203 vdo_waitq_transfer_all_waiters(&lock->waiters, 1204 &next_lock_holder->logical.waiters); 1205 1206 result = vdo_int_map_put(lock->zone->lbn_operations, lock->lbn, 1207 next_lock_holder, true, (void **) &lock_holder); 1208 if (result != VDO_SUCCESS) { 1209 continue_data_vio_with_error(next_lock_holder, result); 1210 return; 1211 } 1212 1213 VDO_ASSERT_LOG_ONLY((lock_holder == data_vio), 1214 "logical block lock mismatch for block %llu", 1215 (unsigned long long) lock->lbn); 1216 lock->locked = false; 1217 1218 /* 1219 * If there are still waiters, other data_vios must be trying to get the lock we just 1220 * transferred. We must ensure that the new lock holder doesn't block in the packer. 1221 */ 1222 if (vdo_waitq_has_waiters(&next_lock_holder->logical.waiters)) 1223 cancel_data_vio_compression(next_lock_holder); 1224 1225 /* 1226 * Avoid stack overflow on lock transfer. 1227 * FIXME: this is only an issue in the 1 thread config. 1228 */ 1229 next_lock_holder->vio.completion.requeue = true; 1230 launch_locked_request(next_lock_holder); 1231} 1232 1233/** 1234 * release_logical_lock() - Release the logical block lock and flush generation lock at the end of 1235 * processing a data_vio. 1236 */ 1237static void release_logical_lock(struct vdo_completion *completion) 1238{ 1239 struct data_vio *data_vio = as_data_vio(completion); 1240 struct lbn_lock *lock = &data_vio->logical; 1241 1242 assert_data_vio_in_logical_zone(data_vio); 1243 1244 if (vdo_waitq_has_waiters(&lock->waiters)) 1245 transfer_lock(data_vio, lock); 1246 else 1247 release_lock(data_vio, lock); 1248 1249 vdo_release_flush_generation_lock(data_vio); 1250 perform_cleanup_stage(data_vio, VIO_CLEANUP_DONE); 1251} 1252 1253/** clean_hash_lock() - Release the hash lock at the end of processing a data_vio. */ 1254static void clean_hash_lock(struct vdo_completion *completion) 1255{ 1256 struct data_vio *data_vio = as_data_vio(completion); 1257 1258 assert_data_vio_in_hash_zone(data_vio); 1259 if (completion->result != VDO_SUCCESS) { 1260 vdo_clean_failed_hash_lock(data_vio); 1261 return; 1262 } 1263 1264 vdo_release_hash_lock(data_vio); 1265 perform_cleanup_stage(data_vio, VIO_RELEASE_LOGICAL); 1266} 1267 1268/** 1269 * finish_cleanup() - Make some assertions about a data_vio which has finished cleaning up. 1270 * 1271 * If it is part of a multi-block discard, starts on the next block, otherwise, returns it to the 1272 * pool. 1273 */ 1274static void finish_cleanup(struct data_vio *data_vio) 1275{ 1276 struct vdo_completion *completion = &data_vio->vio.completion; 1277 1278 VDO_ASSERT_LOG_ONLY(data_vio->allocation.lock == NULL, 1279 "complete data_vio has no allocation lock"); 1280 VDO_ASSERT_LOG_ONLY(data_vio->hash_lock == NULL, 1281 "complete data_vio has no hash lock"); 1282 if ((data_vio->remaining_discard <= VDO_BLOCK_SIZE) || 1283 (completion->result != VDO_SUCCESS)) { 1284 struct data_vio_pool *pool = completion->vdo->data_vio_pool; 1285 1286 vdo_funnel_queue_put(pool->queue, &completion->work_queue_entry_link); 1287 schedule_releases(pool); 1288 return; 1289 } 1290 1291 data_vio->remaining_discard -= min_t(u32, data_vio->remaining_discard, 1292 VDO_BLOCK_SIZE - data_vio->offset); 1293 data_vio->is_partial = (data_vio->remaining_discard < VDO_BLOCK_SIZE); 1294 data_vio->read = data_vio->is_partial; 1295 data_vio->offset = 0; 1296 completion->requeue = true; 1297 launch_data_vio(data_vio, data_vio->logical.lbn + 1); 1298} 1299 1300/** perform_cleanup_stage() - Perform the next step in the process of cleaning up a data_vio. */ 1301static void perform_cleanup_stage(struct data_vio *data_vio, 1302 enum data_vio_cleanup_stage stage) 1303{ 1304 struct vdo *vdo = vdo_from_data_vio(data_vio); 1305 1306 switch (stage) { 1307 case VIO_RELEASE_HASH_LOCK: 1308 if (data_vio->hash_lock != NULL) { 1309 launch_data_vio_hash_zone_callback(data_vio, clean_hash_lock); 1310 return; 1311 } 1312 fallthrough; 1313 1314 case VIO_RELEASE_ALLOCATED: 1315 if (data_vio_has_allocation(data_vio)) { 1316 launch_data_vio_allocated_zone_callback(data_vio, 1317 release_allocated_lock); 1318 return; 1319 } 1320 fallthrough; 1321 1322 case VIO_RELEASE_RECOVERY_LOCKS: 1323 if ((data_vio->recovery_sequence_number > 0) && 1324 (READ_ONCE(vdo->read_only_notifier.read_only_error) == VDO_SUCCESS) && 1325 (data_vio->vio.completion.result != VDO_READ_ONLY)) 1326 vdo_log_warning("VDO not read-only when cleaning data_vio with RJ lock"); 1327 fallthrough; 1328 1329 case VIO_RELEASE_LOGICAL: 1330 launch_data_vio_logical_callback(data_vio, release_logical_lock); 1331 return; 1332 1333 default: 1334 finish_cleanup(data_vio); 1335 } 1336} 1337 1338void complete_data_vio(struct vdo_completion *completion) 1339{ 1340 struct data_vio *data_vio = as_data_vio(completion); 1341 1342 completion->error_handler = NULL; 1343 data_vio->last_async_operation = VIO_ASYNC_OP_CLEANUP; 1344 perform_cleanup_stage(data_vio, 1345 (data_vio->write ? VIO_CLEANUP_START : VIO_RELEASE_LOGICAL)); 1346} 1347 1348static void enter_read_only_mode(struct vdo_completion *completion) 1349{ 1350 if (vdo_is_read_only(completion->vdo)) 1351 return; 1352 1353 if (completion->result != VDO_READ_ONLY) { 1354 struct data_vio *data_vio = as_data_vio(completion); 1355 1356 vdo_log_error_strerror(completion->result, 1357 "Preparing to enter read-only mode: data_vio for LBN %llu (becoming mapped to %llu, previously mapped to %llu, allocated %llu) is completing with a fatal error after operation %s", 1358 (unsigned long long) data_vio->logical.lbn, 1359 (unsigned long long) data_vio->new_mapped.pbn, 1360 (unsigned long long) data_vio->mapped.pbn, 1361 (unsigned long long) data_vio->allocation.pbn, 1362 get_data_vio_operation_name(data_vio)); 1363 } 1364 1365 vdo_enter_read_only_mode(completion->vdo, completion->result); 1366} 1367 1368void handle_data_vio_error(struct vdo_completion *completion) 1369{ 1370 struct data_vio *data_vio = as_data_vio(completion); 1371 1372 if ((completion->result == VDO_READ_ONLY) || (data_vio->user_bio == NULL)) 1373 enter_read_only_mode(completion); 1374 1375 update_data_vio_error_stats(data_vio); 1376 complete_data_vio(completion); 1377} 1378 1379/** 1380 * get_data_vio_operation_name() - Get the name of the last asynchronous operation performed on a 1381 * data_vio. 1382 */ 1383const char *get_data_vio_operation_name(struct data_vio *data_vio) 1384{ 1385 BUILD_BUG_ON((MAX_VIO_ASYNC_OPERATION_NUMBER - MIN_VIO_ASYNC_OPERATION_NUMBER) != 1386 ARRAY_SIZE(ASYNC_OPERATION_NAMES)); 1387 1388 return ((data_vio->last_async_operation < MAX_VIO_ASYNC_OPERATION_NUMBER) ? 1389 ASYNC_OPERATION_NAMES[data_vio->last_async_operation] : 1390 "unknown async operation"); 1391} 1392 1393/** 1394 * data_vio_allocate_data_block() - Allocate a data block. 1395 * 1396 * @write_lock_type: The type of write lock to obtain on the block. 1397 * @callback: The callback which will attempt an allocation in the current zone and continue if it 1398 * succeeds. 1399 * @error_handler: The handler for errors while allocating. 1400 */ 1401void data_vio_allocate_data_block(struct data_vio *data_vio, 1402 enum pbn_lock_type write_lock_type, 1403 vdo_action_fn callback, vdo_action_fn error_handler) 1404{ 1405 struct allocation *allocation = &data_vio->allocation; 1406 1407 VDO_ASSERT_LOG_ONLY((allocation->pbn == VDO_ZERO_BLOCK), 1408 "data_vio does not have an allocation"); 1409 allocation->write_lock_type = write_lock_type; 1410 allocation->zone = vdo_get_next_allocation_zone(data_vio->logical.zone); 1411 allocation->first_allocation_zone = allocation->zone->zone_number; 1412 1413 data_vio->vio.completion.error_handler = error_handler; 1414 launch_data_vio_allocated_zone_callback(data_vio, callback); 1415} 1416 1417/** 1418 * release_data_vio_allocation_lock() - Release the PBN lock on a data_vio's allocated block. 1419 * @reset: If true, the allocation will be reset (i.e. any allocated pbn will be forgotten). 1420 * 1421 * If the reference to the locked block is still provisional, it will be released as well. 1422 */ 1423void release_data_vio_allocation_lock(struct data_vio *data_vio, bool reset) 1424{ 1425 struct allocation *allocation = &data_vio->allocation; 1426 physical_block_number_t locked_pbn = allocation->pbn; 1427 1428 assert_data_vio_in_allocated_zone(data_vio); 1429 1430 if (reset || vdo_pbn_lock_has_provisional_reference(allocation->lock)) 1431 allocation->pbn = VDO_ZERO_BLOCK; 1432 1433 vdo_release_physical_zone_pbn_lock(allocation->zone, locked_pbn, 1434 vdo_forget(allocation->lock)); 1435} 1436 1437/** 1438 * uncompress_data_vio() - Uncompress the data a data_vio has just read. 1439 * @mapping_state: The mapping state indicating which fragment to decompress. 1440 * @buffer: The buffer to receive the uncompressed data. 1441 */ 1442int uncompress_data_vio(struct data_vio *data_vio, 1443 enum block_mapping_state mapping_state, char *buffer) 1444{ 1445 int size; 1446 u16 fragment_offset, fragment_size; 1447 struct compressed_block *block = data_vio->compression.block; 1448 int result = vdo_get_compressed_block_fragment(mapping_state, block, 1449 &fragment_offset, &fragment_size); 1450 1451 if (result != VDO_SUCCESS) { 1452 vdo_log_debug("%s: compressed fragment error %d", __func__, result); 1453 return result; 1454 } 1455 1456 size = LZ4_decompress_safe((block->data + fragment_offset), buffer, 1457 fragment_size, VDO_BLOCK_SIZE); 1458 if (size != VDO_BLOCK_SIZE) { 1459 vdo_log_debug("%s: lz4 error", __func__); 1460 return VDO_INVALID_FRAGMENT; 1461 } 1462 1463 return VDO_SUCCESS; 1464} 1465 1466/** 1467 * modify_for_partial_write() - Do the modify-write part of a read-modify-write cycle. 1468 * @completion: The data_vio which has just finished its read. 1469 * 1470 * This callback is registered in read_block(). 1471 */ 1472static void modify_for_partial_write(struct vdo_completion *completion) 1473{ 1474 struct data_vio *data_vio = as_data_vio(completion); 1475 char *data = data_vio->vio.data; 1476 struct bio *bio = data_vio->user_bio; 1477 1478 assert_data_vio_on_cpu_thread(data_vio); 1479 1480 if (bio_op(bio) == REQ_OP_DISCARD) { 1481 memset(data + data_vio->offset, '\0', min_t(u32, 1482 data_vio->remaining_discard, 1483 VDO_BLOCK_SIZE - data_vio->offset)); 1484 } else { 1485 copy_from_bio(bio, data + data_vio->offset); 1486 } 1487 1488 data_vio->is_zero = is_zero_block(data); 1489 data_vio->read = false; 1490 launch_data_vio_logical_callback(data_vio, 1491 continue_data_vio_with_block_map_slot); 1492} 1493 1494static void complete_read(struct vdo_completion *completion) 1495{ 1496 struct data_vio *data_vio = as_data_vio(completion); 1497 char *data = data_vio->vio.data; 1498 bool compressed = vdo_is_state_compressed(data_vio->mapped.state); 1499 1500 assert_data_vio_on_cpu_thread(data_vio); 1501 1502 if (compressed) { 1503 int result = uncompress_data_vio(data_vio, data_vio->mapped.state, data); 1504 1505 if (result != VDO_SUCCESS) { 1506 continue_data_vio_with_error(data_vio, result); 1507 return; 1508 } 1509 } 1510 1511 if (data_vio->write) { 1512 modify_for_partial_write(completion); 1513 return; 1514 } 1515 1516 if (compressed || data_vio->is_partial) 1517 copy_to_bio(data_vio->user_bio, data + data_vio->offset); 1518 1519 acknowledge_data_vio(data_vio); 1520 complete_data_vio(completion); 1521} 1522 1523static void read_endio(struct bio *bio) 1524{ 1525 struct data_vio *data_vio = vio_as_data_vio(bio->bi_private); 1526 int result = blk_status_to_errno(bio->bi_status); 1527 1528 vdo_count_completed_bios(bio); 1529 if (result != VDO_SUCCESS) { 1530 continue_data_vio_with_error(data_vio, result); 1531 return; 1532 } 1533 1534 launch_data_vio_cpu_callback(data_vio, complete_read, 1535 CPU_Q_COMPLETE_READ_PRIORITY); 1536} 1537 1538static void complete_zero_read(struct vdo_completion *completion) 1539{ 1540 struct data_vio *data_vio = as_data_vio(completion); 1541 1542 assert_data_vio_on_cpu_thread(data_vio); 1543 1544 if (data_vio->is_partial) { 1545 memset(data_vio->vio.data, 0, VDO_BLOCK_SIZE); 1546 if (data_vio->write) { 1547 modify_for_partial_write(completion); 1548 return; 1549 } 1550 } else { 1551 zero_fill_bio(data_vio->user_bio); 1552 } 1553 1554 complete_read(completion); 1555} 1556 1557/** 1558 * read_block() - Read a block asynchronously. 1559 * 1560 * This is the callback registered in read_block_mapping(). 1561 */ 1562static void read_block(struct vdo_completion *completion) 1563{ 1564 struct data_vio *data_vio = as_data_vio(completion); 1565 struct vio *vio = as_vio(completion); 1566 int result = VDO_SUCCESS; 1567 1568 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) { 1569 launch_data_vio_cpu_callback(data_vio, complete_zero_read, 1570 CPU_Q_COMPLETE_VIO_PRIORITY); 1571 return; 1572 } 1573 1574 data_vio->last_async_operation = VIO_ASYNC_OP_READ_DATA_VIO; 1575 if (vdo_is_state_compressed(data_vio->mapped.state)) { 1576 result = vio_reset_bio(vio, (char *) data_vio->compression.block, 1577 read_endio, REQ_OP_READ, data_vio->mapped.pbn); 1578 } else { 1579 blk_opf_t opf = ((data_vio->user_bio->bi_opf & PASSTHROUGH_FLAGS) | REQ_OP_READ); 1580 1581 if (data_vio->is_partial) { 1582 result = vio_reset_bio(vio, vio->data, read_endio, opf, 1583 data_vio->mapped.pbn); 1584 } else { 1585 /* A full 4k read. Use the incoming bio to avoid having to copy the data */ 1586 bio_reset(vio->bio, vio->bio->bi_bdev, opf); 1587 bio_init_clone(data_vio->user_bio->bi_bdev, vio->bio, 1588 data_vio->user_bio, GFP_KERNEL); 1589 1590 /* Copy over the original bio iovec and opflags. */ 1591 vdo_set_bio_properties(vio->bio, vio, read_endio, opf, 1592 data_vio->mapped.pbn); 1593 } 1594 } 1595 1596 if (result != VDO_SUCCESS) { 1597 continue_data_vio_with_error(data_vio, result); 1598 return; 1599 } 1600 1601 vdo_submit_data_vio(data_vio); 1602} 1603 1604static inline struct data_vio * 1605reference_count_update_completion_as_data_vio(struct vdo_completion *completion) 1606{ 1607 if (completion->type == VIO_COMPLETION) 1608 return as_data_vio(completion); 1609 1610 return container_of(completion, struct data_vio, decrement_completion); 1611} 1612 1613/** 1614 * update_block_map() - Rendezvous of the data_vio and decrement completions after each has 1615 * made its reference updates. Handle any error from either, or proceed 1616 * to updating the block map. 1617 * @completion: The completion of the write in progress. 1618 */ 1619static void update_block_map(struct vdo_completion *completion) 1620{ 1621 struct data_vio *data_vio = reference_count_update_completion_as_data_vio(completion); 1622 1623 assert_data_vio_in_logical_zone(data_vio); 1624 1625 if (!data_vio->first_reference_operation_complete) { 1626 /* Rendezvous, we're first */ 1627 data_vio->first_reference_operation_complete = true; 1628 return; 1629 } 1630 1631 completion = &data_vio->vio.completion; 1632 vdo_set_completion_result(completion, data_vio->decrement_completion.result); 1633 if (completion->result != VDO_SUCCESS) { 1634 handle_data_vio_error(completion); 1635 return; 1636 } 1637 1638 completion->error_handler = handle_data_vio_error; 1639 if (data_vio->hash_lock != NULL) 1640 set_data_vio_hash_zone_callback(data_vio, vdo_continue_hash_lock); 1641 else 1642 completion->callback = complete_data_vio; 1643 1644 data_vio->last_async_operation = VIO_ASYNC_OP_PUT_MAPPED_BLOCK; 1645 vdo_put_mapped_block(data_vio); 1646} 1647 1648static void decrement_reference_count(struct vdo_completion *completion) 1649{ 1650 struct data_vio *data_vio = container_of(completion, struct data_vio, 1651 decrement_completion); 1652 1653 assert_data_vio_in_mapped_zone(data_vio); 1654 1655 vdo_set_completion_callback(completion, update_block_map, 1656 data_vio->logical.zone->thread_id); 1657 completion->error_handler = update_block_map; 1658 vdo_modify_reference_count(completion, &data_vio->decrement_updater); 1659} 1660 1661static void increment_reference_count(struct vdo_completion *completion) 1662{ 1663 struct data_vio *data_vio = as_data_vio(completion); 1664 1665 assert_data_vio_in_new_mapped_zone(data_vio); 1666 1667 if (data_vio->downgrade_allocation_lock) { 1668 /* 1669 * Now that the data has been written, it's safe to deduplicate against the 1670 * block. Downgrade the allocation lock to a read lock so it can be used later by 1671 * the hash lock. This is done here since it needs to happen sometime before we 1672 * return to the hash zone, and we are currently on the correct thread. For 1673 * compressed blocks, the downgrade will have already been done. 1674 */ 1675 vdo_downgrade_pbn_write_lock(data_vio->allocation.lock, false); 1676 } 1677 1678 set_data_vio_logical_callback(data_vio, update_block_map); 1679 completion->error_handler = update_block_map; 1680 vdo_modify_reference_count(completion, &data_vio->increment_updater); 1681} 1682 1683/** journal_remapping() - Add a recovery journal entry for a data remapping. */ 1684static void journal_remapping(struct vdo_completion *completion) 1685{ 1686 struct data_vio *data_vio = as_data_vio(completion); 1687 1688 assert_data_vio_in_journal_zone(data_vio); 1689 1690 data_vio->decrement_updater.operation = VDO_JOURNAL_DATA_REMAPPING; 1691 data_vio->decrement_updater.zpbn = data_vio->mapped; 1692 if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) { 1693 data_vio->first_reference_operation_complete = true; 1694 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) 1695 set_data_vio_logical_callback(data_vio, update_block_map); 1696 } else { 1697 set_data_vio_new_mapped_zone_callback(data_vio, 1698 increment_reference_count); 1699 } 1700 1701 if (data_vio->mapped.pbn == VDO_ZERO_BLOCK) { 1702 data_vio->first_reference_operation_complete = true; 1703 } else { 1704 vdo_set_completion_callback(&data_vio->decrement_completion, 1705 decrement_reference_count, 1706 data_vio->mapped.zone->thread_id); 1707 } 1708 1709 data_vio->last_async_operation = VIO_ASYNC_OP_JOURNAL_REMAPPING; 1710 vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio); 1711} 1712 1713/** 1714 * read_old_block_mapping() - Get the previous PBN/LBN mapping of an in-progress write. 1715 * 1716 * Gets the previous PBN mapped to this LBN from the block map, so as to make an appropriate 1717 * journal entry referencing the removal of this LBN->PBN mapping. 1718 */ 1719static void read_old_block_mapping(struct vdo_completion *completion) 1720{ 1721 struct data_vio *data_vio = as_data_vio(completion); 1722 1723 assert_data_vio_in_logical_zone(data_vio); 1724 1725 data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_WRITE; 1726 set_data_vio_journal_callback(data_vio, journal_remapping); 1727 vdo_get_mapped_block(data_vio); 1728} 1729 1730void update_metadata_for_data_vio_write(struct data_vio *data_vio, struct pbn_lock *lock) 1731{ 1732 data_vio->increment_updater = (struct reference_updater) { 1733 .operation = VDO_JOURNAL_DATA_REMAPPING, 1734 .increment = true, 1735 .zpbn = data_vio->new_mapped, 1736 .lock = lock, 1737 }; 1738 1739 launch_data_vio_logical_callback(data_vio, read_old_block_mapping); 1740} 1741 1742/** 1743 * pack_compressed_data() - Attempt to pack the compressed data_vio into a block. 1744 * 1745 * This is the callback registered in launch_compress_data_vio(). 1746 */ 1747static void pack_compressed_data(struct vdo_completion *completion) 1748{ 1749 struct data_vio *data_vio = as_data_vio(completion); 1750 1751 assert_data_vio_in_packer_zone(data_vio); 1752 1753 if (!vdo_get_compressing(vdo_from_data_vio(data_vio)) || 1754 get_data_vio_compression_status(data_vio).may_not_compress) { 1755 write_data_vio(data_vio); 1756 return; 1757 } 1758 1759 data_vio->last_async_operation = VIO_ASYNC_OP_ATTEMPT_PACKING; 1760 vdo_attempt_packing(data_vio); 1761} 1762 1763/** 1764 * compress_data_vio() - Do the actual work of compressing the data on a CPU queue. 1765 * 1766 * This callback is registered in launch_compress_data_vio(). 1767 */ 1768static void compress_data_vio(struct vdo_completion *completion) 1769{ 1770 struct data_vio *data_vio = as_data_vio(completion); 1771 int size; 1772 1773 assert_data_vio_on_cpu_thread(data_vio); 1774 1775 /* 1776 * By putting the compressed data at the start of the compressed block data field, we won't 1777 * need to copy it if this data_vio becomes a compressed write agent. 1778 */ 1779 size = LZ4_compress_default(data_vio->vio.data, 1780 data_vio->compression.block->data, VDO_BLOCK_SIZE, 1781 VDO_MAX_COMPRESSED_FRAGMENT_SIZE, 1782 (char *) vdo_get_work_queue_private_data()); 1783 if ((size > 0) && (size < VDO_COMPRESSED_BLOCK_DATA_SIZE)) { 1784 data_vio->compression.size = size; 1785 launch_data_vio_packer_callback(data_vio, pack_compressed_data); 1786 return; 1787 } 1788 1789 write_data_vio(data_vio); 1790} 1791 1792/** 1793 * launch_compress_data_vio() - Continue a write by attempting to compress the data. 1794 * 1795 * This is a re-entry point to vio_write used by hash locks. 1796 */ 1797void launch_compress_data_vio(struct data_vio *data_vio) 1798{ 1799 VDO_ASSERT_LOG_ONLY(!data_vio->is_duplicate, "compressing a non-duplicate block"); 1800 VDO_ASSERT_LOG_ONLY(data_vio->hash_lock != NULL, 1801 "data_vio to compress has a hash_lock"); 1802 VDO_ASSERT_LOG_ONLY(data_vio_has_allocation(data_vio), 1803 "data_vio to compress has an allocation"); 1804 1805 /* 1806 * There are 4 reasons why a data_vio which has reached this point will not be eligible for 1807 * compression: 1808 * 1809 * 1) Since data_vios can block indefinitely in the packer, it would be bad to do so if the 1810 * write request also requests FUA. 1811 * 1812 * 2) A data_vio should not be compressed when compression is disabled for the vdo. 1813 * 1814 * 3) A data_vio could be doing a partial write on behalf of a larger discard which has not 1815 * yet been acknowledged and hence blocking in the packer would be bad. 1816 * 1817 * 4) Some other data_vio may be waiting on this data_vio in which case blocking in the 1818 * packer would also be bad. 1819 */ 1820 if (data_vio->fua || 1821 !vdo_get_compressing(vdo_from_data_vio(data_vio)) || 1822 ((data_vio->user_bio != NULL) && (bio_op(data_vio->user_bio) == REQ_OP_DISCARD)) || 1823 (advance_data_vio_compression_stage(data_vio).stage != DATA_VIO_COMPRESSING)) { 1824 write_data_vio(data_vio); 1825 return; 1826 } 1827 1828 data_vio->last_async_operation = VIO_ASYNC_OP_COMPRESS_DATA_VIO; 1829 launch_data_vio_cpu_callback(data_vio, compress_data_vio, 1830 CPU_Q_COMPRESS_BLOCK_PRIORITY); 1831} 1832 1833/** 1834 * hash_data_vio() - Hash the data in a data_vio and set the hash zone (which also flags the record 1835 * name as set). 1836 1837 * This callback is registered in prepare_for_dedupe(). 1838 */ 1839static void hash_data_vio(struct vdo_completion *completion) 1840{ 1841 struct data_vio *data_vio = as_data_vio(completion); 1842 1843 assert_data_vio_on_cpu_thread(data_vio); 1844 VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "zero blocks should not be hashed"); 1845 1846 murmurhash3_128(data_vio->vio.data, VDO_BLOCK_SIZE, 0x62ea60be, 1847 &data_vio->record_name); 1848 1849 data_vio->hash_zone = vdo_select_hash_zone(vdo_from_data_vio(data_vio)->hash_zones, 1850 &data_vio->record_name); 1851 data_vio->last_async_operation = VIO_ASYNC_OP_ACQUIRE_VDO_HASH_LOCK; 1852 launch_data_vio_hash_zone_callback(data_vio, vdo_acquire_hash_lock); 1853} 1854 1855/** prepare_for_dedupe() - Prepare for the dedupe path after attempting to get an allocation. */ 1856static void prepare_for_dedupe(struct data_vio *data_vio) 1857{ 1858 /* We don't care what thread we are on. */ 1859 VDO_ASSERT_LOG_ONLY(!data_vio->is_zero, "must not prepare to dedupe zero blocks"); 1860 1861 /* 1862 * Before we can dedupe, we need to know the record name, so the first 1863 * step is to hash the block data. 1864 */ 1865 data_vio->last_async_operation = VIO_ASYNC_OP_HASH_DATA_VIO; 1866 launch_data_vio_cpu_callback(data_vio, hash_data_vio, CPU_Q_HASH_BLOCK_PRIORITY); 1867} 1868 1869/** 1870 * write_bio_finished() - This is the bio_end_io function registered in write_block() to be called 1871 * when a data_vio's write to the underlying storage has completed. 1872 */ 1873static void write_bio_finished(struct bio *bio) 1874{ 1875 struct data_vio *data_vio = vio_as_data_vio((struct vio *) bio->bi_private); 1876 1877 vdo_count_completed_bios(bio); 1878 vdo_set_completion_result(&data_vio->vio.completion, 1879 blk_status_to_errno(bio->bi_status)); 1880 data_vio->downgrade_allocation_lock = true; 1881 update_metadata_for_data_vio_write(data_vio, data_vio->allocation.lock); 1882} 1883 1884/** write_data_vio() - Write a data block to storage without compression. */ 1885void write_data_vio(struct data_vio *data_vio) 1886{ 1887 struct data_vio_compression_status status, new_status; 1888 int result; 1889 1890 if (!data_vio_has_allocation(data_vio)) { 1891 /* 1892 * There was no space to write this block and we failed to deduplicate or compress 1893 * it. 1894 */ 1895 continue_data_vio_with_error(data_vio, VDO_NO_SPACE); 1896 return; 1897 } 1898 1899 new_status = (struct data_vio_compression_status) { 1900 .stage = DATA_VIO_POST_PACKER, 1901 .may_not_compress = true, 1902 }; 1903 1904 do { 1905 status = get_data_vio_compression_status(data_vio); 1906 } while ((status.stage != DATA_VIO_POST_PACKER) && 1907 !set_data_vio_compression_status(data_vio, status, new_status)); 1908 1909 /* Write the data from the data block buffer. */ 1910 result = vio_reset_bio(&data_vio->vio, data_vio->vio.data, 1911 write_bio_finished, REQ_OP_WRITE, 1912 data_vio->allocation.pbn); 1913 if (result != VDO_SUCCESS) { 1914 continue_data_vio_with_error(data_vio, result); 1915 return; 1916 } 1917 1918 data_vio->last_async_operation = VIO_ASYNC_OP_WRITE_DATA_VIO; 1919 vdo_submit_data_vio(data_vio); 1920} 1921 1922/** 1923 * acknowledge_write_callback() - Acknowledge a write to the requestor. 1924 * 1925 * This callback is registered in allocate_block() and continue_write_with_block_map_slot(). 1926 */ 1927static void acknowledge_write_callback(struct vdo_completion *completion) 1928{ 1929 struct data_vio *data_vio = as_data_vio(completion); 1930 struct vdo *vdo = completion->vdo; 1931 1932 VDO_ASSERT_LOG_ONLY((!vdo_uses_bio_ack_queue(vdo) || 1933 (vdo_get_callback_thread_id() == vdo->thread_config.bio_ack_thread)), 1934 "%s() called on bio ack queue", __func__); 1935 VDO_ASSERT_LOG_ONLY(data_vio_has_flush_generation_lock(data_vio), 1936 "write VIO to be acknowledged has a flush generation lock"); 1937 acknowledge_data_vio(data_vio); 1938 if (data_vio->new_mapped.pbn == VDO_ZERO_BLOCK) { 1939 /* This is a zero write or discard */ 1940 update_metadata_for_data_vio_write(data_vio, NULL); 1941 return; 1942 } 1943 1944 prepare_for_dedupe(data_vio); 1945} 1946 1947/** 1948 * allocate_block() - Attempt to allocate a block in the current allocation zone. 1949 * 1950 * This callback is registered in continue_write_with_block_map_slot(). 1951 */ 1952static void allocate_block(struct vdo_completion *completion) 1953{ 1954 struct data_vio *data_vio = as_data_vio(completion); 1955 1956 assert_data_vio_in_allocated_zone(data_vio); 1957 1958 if (!vdo_allocate_block_in_zone(data_vio)) 1959 return; 1960 1961 completion->error_handler = handle_data_vio_error; 1962 WRITE_ONCE(data_vio->allocation_succeeded, true); 1963 data_vio->new_mapped = (struct zoned_pbn) { 1964 .zone = data_vio->allocation.zone, 1965 .pbn = data_vio->allocation.pbn, 1966 .state = VDO_MAPPING_STATE_UNCOMPRESSED, 1967 }; 1968 1969 if (data_vio->fua) { 1970 prepare_for_dedupe(data_vio); 1971 return; 1972 } 1973 1974 data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE; 1975 launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback); 1976} 1977 1978/** 1979 * handle_allocation_error() - Handle an error attempting to allocate a block. 1980 * 1981 * This error handler is registered in continue_write_with_block_map_slot(). 1982 */ 1983static void handle_allocation_error(struct vdo_completion *completion) 1984{ 1985 struct data_vio *data_vio = as_data_vio(completion); 1986 1987 if (completion->result == VDO_NO_SPACE) { 1988 /* We failed to get an allocation, but we can try to dedupe. */ 1989 vdo_reset_completion(completion); 1990 completion->error_handler = handle_data_vio_error; 1991 prepare_for_dedupe(data_vio); 1992 return; 1993 } 1994 1995 /* We got a "real" error, not just a failure to allocate, so fail the request. */ 1996 handle_data_vio_error(completion); 1997} 1998 1999static int assert_is_discard(struct data_vio *data_vio) 2000{ 2001 int result = VDO_ASSERT(data_vio->is_discard, 2002 "data_vio with no block map page is a discard"); 2003 2004 return ((result == VDO_SUCCESS) ? result : VDO_READ_ONLY); 2005} 2006 2007/** 2008 * continue_data_vio_with_block_map_slot() - Read the data_vio's mapping from the block map. 2009 * 2010 * This callback is registered in launch_read_data_vio(). 2011 */ 2012void continue_data_vio_with_block_map_slot(struct vdo_completion *completion) 2013{ 2014 struct data_vio *data_vio = as_data_vio(completion); 2015 2016 assert_data_vio_in_logical_zone(data_vio); 2017 if (data_vio->read) { 2018 set_data_vio_logical_callback(data_vio, read_block); 2019 data_vio->last_async_operation = VIO_ASYNC_OP_GET_MAPPED_BLOCK_FOR_READ; 2020 vdo_get_mapped_block(data_vio); 2021 return; 2022 } 2023 2024 vdo_acquire_flush_generation_lock(data_vio); 2025 2026 if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) { 2027 /* 2028 * This is a discard for a block on a block map page which has not been allocated, so 2029 * there's nothing more we need to do. 2030 */ 2031 completion->callback = complete_data_vio; 2032 continue_data_vio_with_error(data_vio, assert_is_discard(data_vio)); 2033 return; 2034 } 2035 2036 /* 2037 * We need an allocation if this is neither a full-block discard nor a 2038 * full-block zero write. 2039 */ 2040 if (!data_vio->is_zero && (!data_vio->is_discard || data_vio->is_partial)) { 2041 data_vio_allocate_data_block(data_vio, VIO_WRITE_LOCK, allocate_block, 2042 handle_allocation_error); 2043 return; 2044 } 2045 2046 2047 /* 2048 * We don't need to write any data, so skip allocation and just update the block map and 2049 * reference counts (via the journal). 2050 */ 2051 data_vio->new_mapped.pbn = VDO_ZERO_BLOCK; 2052 if (data_vio->is_zero) 2053 data_vio->new_mapped.state = VDO_MAPPING_STATE_UNCOMPRESSED; 2054 2055 if (data_vio->remaining_discard > VDO_BLOCK_SIZE) { 2056 /* This is not the final block of a discard so we can't acknowledge it yet. */ 2057 update_metadata_for_data_vio_write(data_vio, NULL); 2058 return; 2059 } 2060 2061 data_vio->last_async_operation = VIO_ASYNC_OP_ACKNOWLEDGE_WRITE; 2062 launch_data_vio_on_bio_ack_queue(data_vio, acknowledge_write_callback); 2063} 2064